This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new f2b428c MINOR: Add ClientQuotaMetadataManager for processing
QuotaRecord (#10101)
f2b428c is described below
commit f2b428cbdb9cf84feca90d9c7718cc756df734b6
Author: David Arthur <[email protected]>
AuthorDate: Wed Feb 10 17:04:22 2021 -0500
MINOR: Add ClientQuotaMetadataManager for processing QuotaRecord (#10101)
This PR brings in the new broker metadata processor for handling
QuotaRecord-s coming from the metadata log. Also included is a new cache class
to allow for fast lookups of quotas on the broker for handling
DescribeClientQuotaRequest.
Reviewers: Colin P. McCabe <[email protected]>
---
.../kafka/server/metadata/ClientQuotaCache.scala | 297 ++++++++++++++
.../metadata/ClientQuotaMetadataManager.scala | 174 ++++++++
.../metadata/ClientQuotaMetadataManagerTest.scala | 452 +++++++++++++++++++++
3 files changed, 923 insertions(+)
diff --git a/core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala
b/core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala
new file mode 100644
index 0000000..ad2378e
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
+import org.apache.kafka.common.errors.{InvalidRequestException,
UnsupportedVersionException}
+import org.apache.kafka.common.quota.{ClientQuotaEntity,
ClientQuotaFilterComponent}
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+
+// A type for the cache index keys
+sealed trait CacheIndexKey
+case object DefaultUser extends CacheIndexKey
+case class SpecificUser(user: String) extends CacheIndexKey
+case object DefaultClientId extends CacheIndexKey
+case class SpecificClientId(clientId: String) extends CacheIndexKey
+case object DefaultIp extends CacheIndexKey
+case class SpecificIp(ip: String) extends CacheIndexKey
+
+
+// Different types of matching constraints
+sealed trait QuotaMatch
+case class ExactMatch(entityName: String) extends QuotaMatch
+case object DefaultMatch extends QuotaMatch
+case object TypeMatch extends QuotaMatch
+
+
+/**
+ * Maintains a cache of QuotaEntity and their respective quotas.
+ *
+ * The main cache is structured like:
+ *
+ * <pre>
+ * {
+ * (user:alice) -> {consumer_byte_rate: 10000},
+ * (user:alice,client:x) -> {consumer_byte_rate: 8000, producer_byte_rate:
8000}
+ * }
+ * </pre>
+ *
+ * In addition to this cache, this class maintains three indexes for the three
supported entity types (user, client,
+ * and IP). These indexes map a part of an entity to the list of all
QuotaEntity which contain that entity. For example:
+ *
+ * <pre>
+ * {
+ * SpecificUser(alice) -> [(user:alice), (user:alice,client:x)],
+ * DefaultUser -> [(user:default), (user:default, client:x)]
+ * }
+ * </pre>
+ *
+ * These indexes exist to support the flexible lookups needed by
DescribeClientQuota RPC
+ */
+class ClientQuotaCache {
+ private type QuotaCacheIndex = mutable.HashMap[CacheIndexKey,
mutable.HashSet[QuotaEntity]]
+
+ private val quotaCache = new mutable.HashMap[QuotaEntity,
mutable.Map[String, Double]]
+
+ // We need three separate indexes because we also support wildcard lookups
on entity type.
+ private val userEntityIndex = new QuotaCacheIndex
+ private val clientIdEntityIndex = new QuotaCacheIndex
+ private val ipEntityIndex = new QuotaCacheIndex
+
+ private val lock = new ReentrantReadWriteLock()
+
+ /**
+ * Return quota entries for a given filter. These entries are returned from
an in-memory cache and may not reflect
+ * the latest state of the quotas according to the controller. If a filter
is given for an unsupported entity type
+ * or an invalid combination of entity types, this method will throw an
exception.
+ *
+ * @param filters A collection of quota filters (entity type and a
match clause).
+ * @param strict True if we should only return entities which match
all the filter clauses and have no
+ * additional unmatched parts.
+ * @return A mapping of quota entities along with their quota
values.
+ */
+ def describeClientQuotas(filters: Seq[ClientQuotaFilterComponent], strict:
Boolean):
+ Map[ClientQuotaEntity, Map[String, Double]] = inReadLock(lock) {
+ describeClientQuotasInternal(filters, strict).map { case (entity, value)
=> convertEntity(entity) -> value}
+ }
+
+ // Visible for testing (QuotaEntity is nicer for assertions in test code)
+ private[metadata] def describeClientQuotasInternal(filters:
Seq[ClientQuotaFilterComponent], strict: Boolean):
+ Map[QuotaEntity, Map[String, Double]] = inReadLock(lock) {
+
+ // Do some preliminary validation of the filter types and convert them to
correct QuotaMatch type
+ val entityFilters = mutable.HashMap.empty[String, QuotaMatch]
+ filters.foreach { component =>
+ val entityType = component.entityType()
+ if (entityFilters.contains(entityType)) {
+ throw new InvalidRequestException(s"Duplicate ${entityType} filter
component entity type")
+ } else if (entityType.isEmpty) {
+ throw new InvalidRequestException("Unexpected empty filter component
entity type")
+ } else if (!ClientQuotaEntity.isValidEntityType(entityType)) {
+ throw new UnsupportedVersionException(s"Custom entity type
${entityType} not supported")
+ }
+
+ // A present "match()" is an exact match on name, an absent "match()" is
a match on the default entity,
+ // and a null "match()" is a match on the entity type
+ val entityMatch = if (component.`match`() != null &&
component.`match`().isPresent) {
+ ExactMatch(component.`match`().get())
+ } else if (component.`match`() != null) {
+ DefaultMatch
+ } else {
+ TypeMatch
+ }
+ entityFilters.put(entityType, entityMatch)
+ }
+
+ if (entityFilters.isEmpty) {
+ return Map.empty
+ }
+
+ // We do not allow IP filters to be combined with user or client filters
+ val matchingEntities: Set[QuotaEntity] = if
(entityFilters.contains(ClientQuotaEntity.IP)) {
+ if (entityFilters.size > 1) {
+ throw new InvalidRequestException("Invalid entity filter component
combination, IP filter component should " +
+ "not be used with user or clientId filter component.")
+ }
+ val ipMatch = entityFilters.get(ClientQuotaEntity.IP)
+ ipMatch.fold(Set.empty[QuotaEntity]) {
+ case ExactMatch(ip) => ipEntityIndex.getOrElse(SpecificIp(ip),
Set.empty).toSet
+ case DefaultMatch => ipEntityIndex.getOrElse(DefaultIp,
Set.empty).toSet
+ case TypeMatch => ipEntityIndex.values.flatten.toSet
+ }
+ } else if (entityFilters.contains(ClientQuotaEntity.USER) ||
entityFilters.contains(ClientQuotaEntity.CLIENT_ID)) {
+ // If either are present, check both user and client indexes
+ val userMatch = entityFilters.get(ClientQuotaEntity.USER)
+ val userIndexMatches = userMatch.fold(Set.empty[QuotaEntity]) {
+ case ExactMatch(user) => userEntityIndex.getOrElse(SpecificUser(user),
Set.empty).toSet
+ case DefaultMatch => userEntityIndex.getOrElse(DefaultUser,
Set.empty).toSet
+ case TypeMatch => userEntityIndex.values.flatten.toSet
+ }
+
+ val clientMatch = entityFilters.get(ClientQuotaEntity.CLIENT_ID)
+ val clientIndexMatches = clientMatch.fold(Set.empty[QuotaEntity]) {
+ case ExactMatch(clientId) =>
clientIdEntityIndex.getOrElse(SpecificClientId(clientId), Set.empty).toSet
+ case DefaultMatch => clientIdEntityIndex.getOrElse(DefaultClientId,
Set.empty).toSet
+ case TypeMatch => clientIdEntityIndex.values.flatten.toSet
+ }
+
+ val candidateMatches = if (userMatch.isDefined && clientMatch.isDefined)
{
+ userIndexMatches.intersect(clientIndexMatches)
+ } else if (userMatch.isDefined) {
+ userIndexMatches
+ } else {
+ clientIndexMatches
+ }
+
+ if (strict) {
+ // If in strict mode, we need to remove any matches with unspecified
entity types. This only applies to results
+ // with more than one entity part (i.e., user and clientId)
+ candidateMatches.filter { quotaEntity =>
+ quotaEntity match {
+ case ExplicitUserExplicitClientIdEntity(_, _) =>
userMatch.isDefined && clientMatch.isDefined
+ case DefaultUserExplicitClientIdEntity(_) => userMatch.isDefined
&& clientMatch.isDefined
+ case ExplicitUserDefaultClientIdEntity(_) => userMatch.isDefined
&& clientMatch.isDefined
+ case DefaultUserDefaultClientIdEntity => userMatch.isDefined &&
clientMatch.isDefined
+ case _ => true
+ }
+ }
+ } else {
+ candidateMatches
+ }
+ } else {
+ // ClientQuotaEntity.isValidEntityType check above should prevent any
unknown entity types
+ throw new IllegalStateException(s"Unexpected handling of
${entityFilters} after filter validation")
+ }
+
+ val resultsMap: Map[QuotaEntity, Map[String, Double]] =
matchingEntities.map {
+ quotaEntity => {
+ quotaCache.get(quotaEntity) match {
+ case Some(quotas) => quotaEntity -> quotas.toMap
+ case None => quotaEntity -> Map.empty[String, Double]
+ }
+ }
+ }.toMap
+
+ resultsMap
+ }
+
+ private def convertEntity(entity: QuotaEntity): ClientQuotaEntity = {
+ val entityMap = entity match {
+ case IpEntity(ip) => Map(ClientQuotaEntity.IP -> ip)
+ case DefaultIpEntity => Map(ClientQuotaEntity.IP -> null)
+ case UserEntity(user) => Map(ClientQuotaEntity.USER -> user)
+ case DefaultUserEntity => Map(ClientQuotaEntity.USER -> null)
+ case ClientIdEntity(clientId) => Map(ClientQuotaEntity.CLIENT_ID ->
clientId)
+ case DefaultClientIdEntity => Map(ClientQuotaEntity.CLIENT_ID -> null)
+ case ExplicitUserExplicitClientIdEntity(user, clientId) =>
+ Map(ClientQuotaEntity.USER -> user, ClientQuotaEntity.CLIENT_ID ->
clientId)
+ case ExplicitUserDefaultClientIdEntity(user) =>
+ Map(ClientQuotaEntity.USER -> user, ClientQuotaEntity.CLIENT_ID ->
null)
+ case DefaultUserExplicitClientIdEntity(clientId) =>
+ Map(ClientQuotaEntity.USER -> null, ClientQuotaEntity.CLIENT_ID ->
clientId)
+ case DefaultUserDefaultClientIdEntity =>
+ Map(ClientQuotaEntity.USER -> null, ClientQuotaEntity.CLIENT_ID ->
null)
+ }
+ new ClientQuotaEntity(entityMap.asJava)
+ }
+
+ // Update the cache indexes
+ private def updateCacheIndex(quotaEntity: QuotaEntity,
+ remove: Boolean)
+ (quotaCacheIndex: QuotaCacheIndex,
+ key: CacheIndexKey): Unit = {
+ if (remove) {
+ val needsCleanup = quotaCacheIndex.get(key) match {
+ case Some(quotaEntitySet) =>
+ quotaEntitySet.remove(quotaEntity)
+ quotaEntitySet.isEmpty
+ case None => false
+ }
+ if (needsCleanup) {
+ quotaCacheIndex.remove(key)
+ }
+ } else {
+ quotaCacheIndex.getOrElseUpdate(key,
mutable.HashSet.empty).add(quotaEntity)
+ }
+ }
+
+ /**
+ * Update the quota cache with the given entity and quota key/value. If
remove is set, the value is ignore and
+ * the quota entry is removed for the given key. No validation on quota keys
is performed here, it is assumed
+ * that the caller has already done this.
+ *
+ * @param entity A quota entity, either a specific entity or the default
entity for the given type(s)
+ * @param key The quota key
+ * @param value The quota value
+ * @param remove True if we should remove the given quota key from the
entity's quota cache
+ */
+ def updateQuotaCache(entity: QuotaEntity, key: String, value: Double,
remove: Boolean): Unit = inWriteLock(lock) {
+ val quotaValues = quotaCache.getOrElseUpdate(entity, mutable.HashMap.empty)
+ val removeFromIndex = if (remove) {
+ quotaValues.remove(key)
+ if (quotaValues.isEmpty) {
+ quotaCache.remove(entity)
+ true
+ } else {
+ false
+ }
+ } else {
+ quotaValues.put(key, value)
+ false
+ }
+
+ // Update the appropriate indexes with the entity
+ val updateCacheIndexPartial: (QuotaCacheIndex, CacheIndexKey) => Unit =
updateCacheIndex(entity, removeFromIndex)
+ entity match {
+ case UserEntity(user) =>
+ updateCacheIndexPartial(userEntityIndex, SpecificUser(user))
+ case DefaultUserEntity =>
+ updateCacheIndexPartial(userEntityIndex, DefaultUser)
+
+ case ClientIdEntity(clientId) =>
+ updateCacheIndexPartial(clientIdEntityIndex,
SpecificClientId(clientId))
+ case DefaultClientIdEntity =>
+ updateCacheIndexPartial(clientIdEntityIndex, DefaultClientId)
+
+ case ExplicitUserExplicitClientIdEntity(user, clientId) =>
+ updateCacheIndexPartial(userEntityIndex, SpecificUser(user))
+ updateCacheIndexPartial(clientIdEntityIndex,
SpecificClientId(clientId))
+
+ case ExplicitUserDefaultClientIdEntity(user) =>
+ updateCacheIndexPartial(userEntityIndex, SpecificUser(user))
+ updateCacheIndexPartial(clientIdEntityIndex, DefaultClientId)
+
+ case DefaultUserExplicitClientIdEntity(clientId) =>
+ updateCacheIndexPartial(userEntityIndex, DefaultUser)
+ updateCacheIndexPartial(clientIdEntityIndex,
SpecificClientId(clientId))
+
+ case DefaultUserDefaultClientIdEntity =>
+ updateCacheIndexPartial(userEntityIndex, DefaultUser)
+ updateCacheIndexPartial(clientIdEntityIndex, DefaultClientId)
+
+ case IpEntity(ip) =>
+ updateCacheIndexPartial(ipEntityIndex, SpecificIp(ip))
+ case DefaultIpEntity =>
+ updateCacheIndexPartial(ipEntityIndex, DefaultIp)
+ }
+ }
+}
diff --git
a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
new file mode 100644
index 0000000..76e0b54
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.network.ConnectionQuotas
+import kafka.server.ConfigEntityName
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.common.config.internals.QuotaConfigs
+import org.apache.kafka.common.metadata.QuotaRecord
+import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.common.utils.Sanitizer
+
+import java.net.{InetAddress, UnknownHostException}
+import scala.collection.mutable
+
+
+// A strict hierarchy of entities that we support
+sealed trait QuotaEntity
+case class IpEntity(ip: String) extends QuotaEntity
+case object DefaultIpEntity extends QuotaEntity
+case class UserEntity(user: String) extends QuotaEntity
+case object DefaultUserEntity extends QuotaEntity
+case class ClientIdEntity(clientId: String) extends QuotaEntity
+case object DefaultClientIdEntity extends QuotaEntity
+case class ExplicitUserExplicitClientIdEntity(user: String, clientId: String)
extends QuotaEntity
+case class ExplicitUserDefaultClientIdEntity(user: String) extends QuotaEntity
+case class DefaultUserExplicitClientIdEntity(clientId: String) extends
QuotaEntity
+case object DefaultUserDefaultClientIdEntity extends QuotaEntity
+
+/**
+ * Process quota metadata records as they appear in the metadata log and
update quota managers and cache as necessary
+ */
+class ClientQuotaMetadataManager(private[metadata] val quotaManagers:
QuotaManagers,
+ private[metadata] val connectionQuotas:
ConnectionQuotas,
+ private[metadata] val quotaCache:
ClientQuotaCache) extends Logging {
+
+ def handleQuotaRecord(quotaRecord: QuotaRecord): Unit = {
+ val entityMap = mutable.Map[String, String]()
+ quotaRecord.entity().forEach { entityData =>
+ entityMap.put(entityData.entityType(), entityData.entityName())
+ }
+
+ if (entityMap.contains(ClientQuotaEntity.IP)) {
+ // In the IP quota manager, None is used for default entity
+ val ipEntity = Option(entityMap(ClientQuotaEntity.IP)) match {
+ case Some(ip) => IpEntity(ip)
+ case None => DefaultIpEntity
+ }
+ handleIpQuota(ipEntity, quotaRecord)
+ } else if (entityMap.contains(ClientQuotaEntity.USER) ||
entityMap.contains(ClientQuotaEntity.CLIENT_ID)) {
+ // Need to handle null values for default entity name, so use
"getOrElse" combined with "contains" checks
+ val userVal = entityMap.getOrElse(ClientQuotaEntity.USER, null)
+ val clientIdVal = entityMap.getOrElse(ClientQuotaEntity.CLIENT_ID, null)
+
+ // In User+Client quota managers, "<default>" is used for default
entity, so we need to represent all possible
+ // combinations of values, defaults, and absent entities
+ val userClientEntity = if (entityMap.contains(ClientQuotaEntity.USER) &&
entityMap.contains(ClientQuotaEntity.CLIENT_ID)) {
+ if (userVal == null && clientIdVal == null) {
+ DefaultUserDefaultClientIdEntity
+ } else if (userVal == null) {
+ DefaultUserExplicitClientIdEntity(clientIdVal)
+ } else if (clientIdVal == null) {
+ ExplicitUserDefaultClientIdEntity(userVal)
+ } else {
+ ExplicitUserExplicitClientIdEntity(userVal, clientIdVal)
+ }
+ } else if (entityMap.contains(ClientQuotaEntity.USER)) {
+ if (userVal == null) {
+ DefaultUserEntity
+ } else {
+ UserEntity(userVal)
+ }
+ } else {
+ if (clientIdVal == null) {
+ DefaultClientIdEntity
+ } else {
+ ClientIdEntity(clientIdVal)
+ }
+ }
+ handleUserClientQuota(
+ userClientEntity,
+ quotaRecord
+ )
+ } else {
+ warn(s"Ignoring unsupported quota entity ${quotaRecord.entity()}")
+ }
+ }
+
+ def handleIpQuota(ipEntity: QuotaEntity, quotaRecord: QuotaRecord): Unit = {
+ val inetAddress = ipEntity match {
+ case IpEntity(ip) =>
+ try {
+ Some(InetAddress.getByName(ip))
+ } catch {
+ case _: UnknownHostException => throw new
IllegalArgumentException(s"Unable to resolve address $ip")
+ }
+ case DefaultIpEntity => None
+ case _ => throw new IllegalStateException("Should only handle IP quota
entities here")
+ }
+
+ // The connection quota only understands the connection rate limit
+ if (quotaRecord.key() != QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG) {
+ warn(s"Ignoring unexpected quota key ${quotaRecord.key()} for entity
$ipEntity")
+ return
+ }
+
+ // Update the cache
+ quotaCache.updateQuotaCache(ipEntity, quotaRecord.key, quotaRecord.value,
quotaRecord.remove)
+
+ // Convert the value to an appropriate Option for the quota manager
+ val newValue = if (quotaRecord.remove()) {
+ None
+ } else {
+ Some(quotaRecord.value).map(_.toInt)
+ }
+ connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue)
+ }
+
+ def handleUserClientQuota(quotaEntity: QuotaEntity, quotaRecord:
QuotaRecord): Unit = {
+ val manager = quotaRecord.key() match {
+ case QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG =>
quotaManagers.fetch
+ case QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG =>
quotaManagers.produce
+ case QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG =>
quotaManagers.request
+ case QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG =>
quotaManagers.controllerMutation
+ case _ =>
+ warn(s"Ignoring unexpected quota key ${quotaRecord.key()} for entity
$quotaEntity")
+ return
+ }
+
+ // Convert entity into Options with sanitized values for QuotaManagers
+ val (sanitizedUser, sanitizedClientId) = quotaEntity match {
+ case UserEntity(user) => (Some(Sanitizer.sanitize(user)), None)
+ case DefaultUserEntity => (Some(ConfigEntityName.Default), None)
+ case ClientIdEntity(clientId) => (None,
Some(Sanitizer.sanitize(clientId)))
+ case DefaultClientIdEntity => (None, Some(ConfigEntityName.Default))
+ case ExplicitUserExplicitClientIdEntity(user, clientId) =>
(Some(Sanitizer.sanitize(user)), Some(Sanitizer.sanitize(clientId)))
+ case ExplicitUserDefaultClientIdEntity(user) =>
(Some(Sanitizer.sanitize(user)), Some(ConfigEntityName.Default))
+ case DefaultUserExplicitClientIdEntity(clientId) =>
(Some(ConfigEntityName.Default), Some(Sanitizer.sanitize(clientId)))
+ case DefaultUserDefaultClientIdEntity =>
(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
+ case IpEntity(_) | DefaultIpEntity => throw new
IllegalStateException("Should not see IP quota entities here")
+ }
+
+ val quotaValue = if (quotaRecord.remove()) {
+ None
+ } else {
+ Some(new Quota(quotaRecord.value(), true))
+ }
+
+ manager.updateQuota(
+ sanitizedUser = sanitizedUser,
+ clientId = sanitizedClientId.map(Sanitizer.desanitize),
+ sanitizedClientId = sanitizedClientId,
+ quota = quotaValue)
+
+ quotaCache.updateQuotaCache(quotaEntity, quotaRecord.key,
quotaRecord.value, quotaRecord.remove)
+ }
+}
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/ClientQuotaMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/ClientQuotaMetadataManagerTest.scala
new file mode 100644
index 0000000..02982f4
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/server/metadata/ClientQuotaMetadataManagerTest.scala
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.network.ConnectionQuotas
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.{ConfigEntityName, KafkaConfig, QuotaFactory}
+import kafka.utils.{MockTime, TestUtils}
+import org.apache.kafka.common.config.internals.QuotaConfigs
+import org.apache.kafka.common.errors.{InvalidRequestException,
UnsupportedVersionException}
+import org.apache.kafka.common.metadata.QuotaRecord
+import org.apache.kafka.common.metrics.{Metrics, Quota}
+import org.apache.kafka.common.quota.{ClientQuotaEntity,
ClientQuotaFilterComponent}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.mockito.ArgumentMatchers.{any, eq => _eq}
+import org.mockito.Mockito._
+
+import java.net.InetAddress
+import java.util.Properties
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+class ClientQuotaMetadataManagerTest {
+
+ var manager: ClientQuotaMetadataManager = _
+ var cache: ClientQuotaCache = _
+
+ @BeforeEach
+ def setup(): Unit = {
+ val configs = TestUtils.createBrokerConfigs(1, TestUtils.MockZkConnect)
+ .map(KafkaConfig.fromProps(_, new Properties()))
+
+ val time = new MockTime
+ val metrics = new Metrics
+ val quotaManagers = QuotaFactory.instantiate(configs.head, metrics, time,
"quota-metadata-processor-test")
+ val spiedQuotaManagers = QuotaManagers(
+ fetch = spy(quotaManagers.fetch),
+ produce = spy(quotaManagers.produce),
+ request = spy(quotaManagers.request),
+ controllerMutation = spy(quotaManagers.controllerMutation),
+ leader = quotaManagers.leader,
+ follower = quotaManagers.follower,
+ alterLogDirs = quotaManagers.alterLogDirs,
+ clientQuotaCallback = quotaManagers.clientQuotaCallback
+ )
+ val connectionQuotas = mock(classOf[ConnectionQuotas])
+ cache = new ClientQuotaCache()
+ manager = new ClientQuotaMetadataManager(spiedQuotaManagers,
connectionQuotas, cache)
+ }
+
+ @Test
+ def testDescribeStrictMatch(): Unit = {
+ setupAndVerify(manager, { case (entity, _) =>
+ val components = mutable.ListBuffer[ClientQuotaFilterComponent]()
+ entityToFilter(entity, components)
+ val results = cache.describeClientQuotas(components.toSeq, strict=true)
+ assertEquals(1, results.size, s"Should only match one quota for
${entity}")
+ })
+
+ val nonMatching = List(
+ userClientEntity("user-1", "client-id-2"),
+ userClientEntity("user-3", "client-id-1"),
+ userClientEntity("user-2", null),
+ userEntity("user-4"),
+ userClientEntity(null, "client-id-2"),
+ clientEntity("client-id-1"),
+ clientEntity("client-id-3")
+ )
+
+ nonMatching.foreach( entity => {
+ val components = mutable.ListBuffer[ClientQuotaFilterComponent]()
+ entityToFilter(entity, components)
+ val results = cache.describeClientQuotas(components.toSeq, strict=true)
+ assertEquals(0, results.size)
+ })
+ }
+
+ @Test
+ def testDescribeNonStrictMatch(): Unit = {
+ setupAndVerify(manager, { case (_, _) => })
+
+ // Match open-ended existing user.
+ val components = mutable.ListBuffer[ClientQuotaFilterComponent]()
+ entityToFilter(userEntity("user-1"), components)
+ var results = cache.describeClientQuotasInternal(components.toSeq,
strict=false)
+ assertEquals(3, results.size)
+ assertEquals(3, results.keySet.count(quotaEntity => quotaEntity match {
+ case UserEntity(user) => user.equals("user-1")
+ case ExplicitUserDefaultClientIdEntity(user) => user.equals("user-1")
+ case ExplicitUserExplicitClientIdEntity(user, _) => user.equals("user-1")
+ case _ => false
+ }))
+
+ results = cache.describeClientQuotasInternal(components.toSeq, strict=true)
+ assertEquals(1, results.size)
+
+ // Match open-ended non-existent user.
+ components.clear()
+ entityToFilter(userEntity("unknown"), components)
+ results = cache.describeClientQuotasInternal(components.toSeq,
strict=false)
+ assertEquals(0, results.size)
+
+ // Match open-ended existing client ID.
+ components.clear()
+ entityToFilter(clientEntity("client-id-2"), components)
+ results = cache.describeClientQuotasInternal(components.toSeq,
strict=false)
+ assertEquals(2, results.size)
+ assertEquals(2, results.keySet.count(quotaEntity => quotaEntity match {
+ case ClientIdEntity(clientId) => clientId.equals("client-id-2")
+ case DefaultUserExplicitClientIdEntity(clientId) =>
clientId.equals("client-id-2")
+ case ExplicitUserExplicitClientIdEntity(_, clientId) =>
clientId.equals("client-id-2")
+ case _ => false
+ }))
+
+ // Match open-ended default user.
+ results = cache.describeClientQuotasInternal(
+ Seq(ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.USER)),
strict=false)
+ assertEquals(3, results.size)
+ assertEquals(3, results.keySet.count(quotaEntity => quotaEntity match {
+ case DefaultUserEntity | DefaultUserExplicitClientIdEntity(_) |
DefaultUserDefaultClientIdEntity => true
+ case _ => false
+ }))
+
+ // Match open-ended default client.
+ results = cache.describeClientQuotasInternal(
+
Seq(ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.CLIENT_ID)),
strict=false)
+ assertEquals(3, results.size)
+ assertEquals(3, results.keySet.count(quotaEntity => quotaEntity match {
+ case DefaultClientIdEntity | ExplicitUserDefaultClientIdEntity(_) |
DefaultUserDefaultClientIdEntity => true
+ case _ => false
+ }))
+ }
+
+ @Test
+ def testDescribeFilterOnTypes(): Unit = {
+ setupAndVerify(manager, { case (_, _) => })
+
+ var results = cache.describeClientQuotasInternal(
+ Seq(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)),
strict=false)
+ assertEquals(11, results.size)
+ assertEquals(11, results.keySet.count(quotaEntity => quotaEntity match {
+ case UserEntity(_) | DefaultUserEntity |
ExplicitUserExplicitClientIdEntity(_, _) | ExplicitUserDefaultClientIdEntity(_)
|
+ DefaultUserExplicitClientIdEntity(_) |
DefaultUserDefaultClientIdEntity => true
+ case _ => false
+ }))
+
+ results = cache.describeClientQuotasInternal(
+
Seq(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)),
strict=false)
+ assertEquals(8, results.size)
+ assertEquals(8, results.keySet.count(quotaEntity => quotaEntity match {
+ case ClientIdEntity(_) | DefaultClientIdEntity |
ExplicitUserExplicitClientIdEntity(_, _) | ExplicitUserDefaultClientIdEntity(_)
|
+ DefaultUserExplicitClientIdEntity(_) |
DefaultUserDefaultClientIdEntity => true
+ case _ => false
+ }))
+
+ results = cache.describeClientQuotasInternal(
+ Seq(
+ ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER),
+ ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)
+ ), strict=true)
+ assertEquals(7, results.size)
+ assertEquals(7, results.keySet.count(quotaEntity => quotaEntity match {
+ case ExplicitUserExplicitClientIdEntity(_, _) |
ExplicitUserDefaultClientIdEntity(_) |
+ DefaultUserExplicitClientIdEntity(_) |
DefaultUserDefaultClientIdEntity => true
+ case _ => false
+ }))
+ }
+
+ @Test
+ def testEntityWithDefaultName(): Unit = {
+ addQuotaRecord(manager, clientEntity(ConfigEntityName.Default),
(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0))
+ addQuotaRecord(manager, clientEntity(null),
(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 30000.0))
+
+ val components = mutable.ListBuffer[ClientQuotaFilterComponent]()
+ entityToFilter(clientEntity(ConfigEntityName.Default), components)
+ var results = cache.describeClientQuotas(components.toSeq, strict=true)
+ assertEquals(1, results.size)
+
+ components.clear()
+ entityToFilter(clientEntity(null), components)
+ results = cache.describeClientQuotas(components.toSeq, strict=true)
+ assertEquals(1, results.size)
+ }
+
+ @Test
+ def testQuotaRemoval(): Unit = {
+ val entity = userClientEntity("user", "client-id")
+ addQuotaRecord(manager, entity,
(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 10000.0))
+ addQuotaRecord(manager, entity,
(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0))
+ var quotas = describeEntity(entity)
+ assertEquals(2, quotas.size)
+ assertEquals(10000.0,
quotas(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG), 1e-6)
+
+ addQuotaRecord(manager, entity,
(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 10001.0))
+ quotas = describeEntity(entity)
+ assertEquals(2, quotas.size)
+ assertEquals(10001.0,
quotas(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG), 1e-6)
+
+ addQuotaRemovalRecord(manager, entity,
QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG)
+ quotas = describeEntity(entity)
+ assertEquals(1, quotas.size)
+
assertFalse(quotas.contains(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG))
+
+ addQuotaRemovalRecord(manager, entity,
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG)
+ quotas = describeEntity(entity)
+ assertEquals(0, quotas.size)
+
+ // Removing non-existent quota should not do anything
+ addQuotaRemovalRecord(manager, entity,
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG)
+ quotas = describeEntity(entity)
+ assertEquals(0, quotas.size)
+ }
+
+ @Test
+ def testDescribeClientQuotasInvalidFilterCombination(): Unit = {
+ val ipFilterComponent =
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)
+ val userFilterComponent =
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)
+ val clientIdFilterComponent =
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)
+ val expectedExceptionMessage = "Invalid entity filter component
combination"
+ assertThrows(classOf[InvalidRequestException],
+ () => cache.describeClientQuotas(Seq(ipFilterComponent,
userFilterComponent), strict=false),
+ () => expectedExceptionMessage)
+ assertThrows(classOf[InvalidRequestException],
+ () => cache.describeClientQuotas(Seq(ipFilterComponent,
clientIdFilterComponent), strict=false),
+ () => expectedExceptionMessage)
+ assertThrows(classOf[InvalidRequestException],
+ () => cache.describeClientQuotas(Seq(ipFilterComponent,
ipFilterComponent), strict = false),
+ () => expectedExceptionMessage)
+ assertThrows(classOf[InvalidRequestException],
+ () => cache.describeClientQuotas(Seq(userFilterComponent,
userFilterComponent), strict=false),
+ () => expectedExceptionMessage)
+ }
+
+ @Test
+ def testDescribeEmptyFilter(): Unit = {
+ var results = cache.describeClientQuotas(Seq.empty, strict=false)
+ assertEquals(0, results.size)
+
+ results = cache.describeClientQuotas(Seq.empty, strict=true)
+ assertEquals(0, results.size)
+ }
+
+ @Test
+ def testDescribeUnsupportedEntityType(): Unit = {
+ assertThrows(classOf[UnsupportedVersionException],
+ () =>
cache.describeClientQuotas(Seq(ClientQuotaFilterComponent.ofEntityType("other")),
strict=false))
+ }
+
+ @Test
+ def testDescribeMissingEntityType(): Unit = {
+ assertThrows(classOf[InvalidRequestException],
+ () =>
cache.describeClientQuotas(Seq(ClientQuotaFilterComponent.ofEntity("",
"name")), strict = false))
+ }
+
+ @Test
+ def testQuotaManagers(): Unit = {
+ val entity = userClientEntity("user", "client")
+ addQuotaRecord(manager, entity,
(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 100.0))
+ verify(manager.quotaManagers.fetch, times(1)).updateQuota(
+ _eq(Some("user")),
+ _eq(Some("client")),
+ _eq(Some("client")),
+ any(classOf[Option[Quota]])
+ )
+
+ addQuotaRecord(manager, entity,
(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 100.0))
+ verify(manager.quotaManagers.produce, times(1)).updateQuota(
+ _eq(Some("user")),
+ _eq(Some("client")),
+ _eq(Some("client")),
+ any(classOf[Option[Quota]])
+ )
+
+ addQuotaRecord(manager, entity,
(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 100.0))
+ verify(manager.quotaManagers.request, times(1)).updateQuota(
+ _eq(Some("user")),
+ _eq(Some("client")),
+ _eq(Some("client")),
+ any(classOf[Option[Quota]])
+ )
+
+ addQuotaRecord(manager, entity,
(QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG, 100.0))
+ verify(manager.quotaManagers.controllerMutation, times(1)).updateQuota(
+ _eq(Some("user")),
+ _eq(Some("client")),
+ _eq(Some("client")),
+ any(classOf[Option[Quota]])
+ )
+
+ addQuotaRemovalRecord(manager, entity,
QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG)
+ verify(manager.quotaManagers.controllerMutation, times(1)).updateQuota(
+ _eq(Some("user")),
+ _eq(Some("client")),
+ _eq(Some("client")),
+ _eq(None)
+ )
+ }
+
+ @Test
+ def testIpQuota(): Unit = {
+ val defaultIp = ipEntity(null)
+ val knownIp = ipEntity("1.2.3.4")
+
+ addQuotaRecord(manager, defaultIp,
(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG, 100.0))
+ addQuotaRecord(manager, knownIp,
(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG, 99.0))
+
+ verify(manager.connectionQuotas, times(2)).updateIpConnectionRateQuota(
+ any(classOf[Option[InetAddress]]),
+ any(classOf[Option[Int]])
+ )
+
+ var quotas = describeEntity(defaultIp)
+ assertEquals(1, quotas.size)
+
+ quotas = describeEntity(knownIp)
+ assertEquals(1, quotas.size)
+
+ val results =
cache.describeClientQuotas(Seq(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)),
strict=false)
+ assertEquals(2, results.size)
+
+ reset(manager.connectionQuotas)
+ addQuotaRecord(manager, knownIp,
(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG, 98.0))
+ verify(manager.connectionQuotas, times(1)).updateIpConnectionRateQuota(
+ any(classOf[Option[InetAddress]]),
+ _eq(Some(98))
+ )
+
+ reset(manager.connectionQuotas)
+ addQuotaRemovalRecord(manager, knownIp,
QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG)
+ verify(manager.connectionQuotas, times(1)).updateIpConnectionRateQuota(
+ any(classOf[Option[InetAddress]]),
+ _eq(None)
+ )
+ }
+
+ @Test
+ def testIpQuotaUnknownKey(): Unit = {
+ val defaultIp = ipEntity(null)
+ addQuotaRecord(manager, defaultIp, ("not-an-ip-quota-key", 100.0))
+ verify(manager.connectionQuotas, times(0)).updateIpConnectionRateQuota(
+ any(classOf[Option[InetAddress]]),
+ _eq(Some(100))
+ )
+
+ assertEquals(0, describeEntity(defaultIp).size)
+ }
+
+ @Test
+ def testUserQuotaUnknownKey(): Unit = {
+ val defaultUser = userEntity(null)
+ addQuotaRecord(manager, defaultUser, ("not-a-user-quota-key", 100.0))
+ assertEquals(0, describeEntity(defaultUser).size)
+ }
+
+ def setupAndVerify(manager: ClientQuotaMetadataManager,
+ verifier: (List[QuotaRecord.EntityData], (String,
Double)) => Unit ): Unit = {
+ val toVerify = List(
+ (userClientEntity("user-1", "client-id-1"), 50.50),
+ (userClientEntity("user-2", "client-id-1"), 51.51),
+ (userClientEntity("user-3", "client-id-2"), 52.52),
+ (userClientEntity(null, "client-id-1"), 53.53),
+ (userClientEntity("user-1", null), 54.54),
+ (userClientEntity("user-3", null), 55.55),
+ (userEntity("user-1"), 56.56),
+ (userEntity("user-2"), 57.57),
+ (userEntity("user-3"), 58.58),
+ (userEntity(null), 59.59),
+ (clientEntity("client-id-2"), 60.60),
+ (userClientEntity(null, null), 61.61)
+ )
+
+ toVerify.foreach {
+ case (entity, value) => addQuotaRecord(manager, entity,
(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, value))
+ }
+
+ toVerify.foreach {
+ case (entity, value) => verifier.apply(entity,
(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, value))
+ }
+ }
+
+ def describeEntity(entity: List[QuotaRecord.EntityData]): Map[String,
Double] = {
+ val components = mutable.ListBuffer[ClientQuotaFilterComponent]()
+ entityToFilter(entity, components)
+ val results = cache.describeClientQuotas(components.toSeq, strict=true)
+ if (results.isEmpty) {
+ Map()
+ } else if (results.size == 1) {
+ results.head._2
+ } else {
+ throw new AssertionError("Matched more than one entity with strict=true
describe filter")
+ }
+ }
+
+ def addQuotaRecord(manager: ClientQuotaMetadataManager, entity:
List[QuotaRecord.EntityData], quota: (String, Double)): Unit = {
+ manager.handleQuotaRecord(new QuotaRecord()
+ .setEntity(entity.asJava)
+ .setKey(quota._1)
+ .setValue(quota._2))
+ }
+
+ def addQuotaRemovalRecord(manager: ClientQuotaMetadataManager, entity:
List[QuotaRecord.EntityData], quota: String): Unit = {
+ manager.handleQuotaRecord(new QuotaRecord()
+ .setEntity(entity.asJava)
+ .setKey(quota)
+ .setRemove(true))
+ }
+
+ def entityToFilter(entity: List[QuotaRecord.EntityData], components:
mutable.ListBuffer[ClientQuotaFilterComponent]): Unit = {
+ entity.foreach(entityData => {
+ if (entityData.entityName() == null) {
+
components.append(ClientQuotaFilterComponent.ofDefaultEntity(entityData.entityType()))
+ } else {
+
components.append(ClientQuotaFilterComponent.ofEntity(entityData.entityType(),
entityData.entityName()))
+ }
+ })
+ }
+
+ def clientEntity(clientId: String): List[QuotaRecord.EntityData] = {
+ List(new
QuotaRecord.EntityData().setEntityType(ClientQuotaEntity.CLIENT_ID).setEntityName(clientId))
+ }
+
+ def userEntity(user: String): List[QuotaRecord.EntityData] = {
+ List(new
QuotaRecord.EntityData().setEntityType(ClientQuotaEntity.USER).setEntityName(user))
+ }
+
+ def userClientEntity(user: String, clientId: String):
List[QuotaRecord.EntityData] = {
+ List(
+ new
QuotaRecord.EntityData().setEntityType(ClientQuotaEntity.USER).setEntityName(user),
+ new
QuotaRecord.EntityData().setEntityType(ClientQuotaEntity.CLIENT_ID).setEntityName(clientId)
+ )
+ }
+
+ def ipEntity(ip: String): List[QuotaRecord.EntityData] = {
+ List(new
QuotaRecord.EntityData().setEntityType(ClientQuotaEntity.IP).setEntityName(ip))
+ }
+}