This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new f2b428c  MINOR: Add ClientQuotaMetadataManager for processing 
QuotaRecord  (#10101)
f2b428c is described below

commit f2b428cbdb9cf84feca90d9c7718cc756df734b6
Author: David Arthur <[email protected]>
AuthorDate: Wed Feb 10 17:04:22 2021 -0500

    MINOR: Add ClientQuotaMetadataManager for processing QuotaRecord  (#10101)
    
    This PR brings in the new broker metadata processor for handling 
QuotaRecord-s coming from the metadata log. Also included is a new cache class 
to allow for fast lookups of quotas on the broker for handling 
DescribeClientQuotaRequest.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../kafka/server/metadata/ClientQuotaCache.scala   | 297 ++++++++++++++
 .../metadata/ClientQuotaMetadataManager.scala      | 174 ++++++++
 .../metadata/ClientQuotaMetadataManagerTest.scala  | 452 +++++++++++++++++++++
 3 files changed, 923 insertions(+)

diff --git a/core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala 
b/core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala
new file mode 100644
index 0000000..ad2378e
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
+import org.apache.kafka.common.errors.{InvalidRequestException, 
UnsupportedVersionException}
+import org.apache.kafka.common.quota.{ClientQuotaEntity, 
ClientQuotaFilterComponent}
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+
+// A type for the cache index keys
+sealed trait CacheIndexKey
+case object DefaultUser extends CacheIndexKey
+case class SpecificUser(user: String) extends CacheIndexKey
+case object DefaultClientId extends CacheIndexKey
+case class SpecificClientId(clientId: String) extends CacheIndexKey
+case object DefaultIp extends CacheIndexKey
+case class SpecificIp(ip: String) extends CacheIndexKey
+
+
+// Different types of matching constraints
+sealed trait QuotaMatch
+case class ExactMatch(entityName: String) extends QuotaMatch
+case object DefaultMatch extends QuotaMatch
+case object TypeMatch extends QuotaMatch
+
+
+/**
+ * Maintains a cache of QuotaEntity and their respective quotas.
+ *
+ * The main cache is structured like:
+ *
+ * <pre>
+ * {
+ *   (user:alice) -> {consumer_byte_rate: 10000},
+ *   (user:alice,client:x) -> {consumer_byte_rate: 8000, producer_byte_rate: 
8000}
+ * }
+ * </pre>
+ *
+ * In addition to this cache, this class maintains three indexes for the three 
supported entity types (user, client,
+ * and IP). These indexes map a part of an entity to the list of all 
QuotaEntity which contain that entity. For example:
+ *
+ * <pre>
+ * {
+ *   SpecificUser(alice) -> [(user:alice), (user:alice,client:x)],
+ *   DefaultUser -> [(user:default), (user:default, client:x)]
+ * }
+ * </pre>
+ *
+ * These indexes exist to support the flexible lookups needed by 
DescribeClientQuota RPC
+ */
+class ClientQuotaCache {
+  private type QuotaCacheIndex = mutable.HashMap[CacheIndexKey, 
mutable.HashSet[QuotaEntity]]
+
+  private val quotaCache = new mutable.HashMap[QuotaEntity, 
mutable.Map[String, Double]]
+
+  // We need three separate indexes because we also support wildcard lookups 
on entity type.
+  private val userEntityIndex = new QuotaCacheIndex
+  private val clientIdEntityIndex = new QuotaCacheIndex
+  private val ipEntityIndex = new QuotaCacheIndex
+
+  private val lock = new ReentrantReadWriteLock()
+
+  /**
+   * Return quota entries for a given filter. These entries are returned from 
an in-memory cache and may not reflect
+   * the latest state of the quotas according to the controller. If a filter 
is given for an unsupported entity type
+   * or an invalid combination of entity types, this method will throw an 
exception.
+   *
+   * @param filters       A collection of quota filters (entity type and a 
match clause).
+   * @param strict        True if we should only return entities which match 
all the filter clauses and have no
+   *                      additional unmatched parts.
+   * @return              A mapping of quota entities along with their quota 
values.
+   */
+  def describeClientQuotas(filters: Seq[ClientQuotaFilterComponent], strict: 
Boolean):
+      Map[ClientQuotaEntity, Map[String, Double]] = inReadLock(lock) {
+    describeClientQuotasInternal(filters, strict).map { case (entity, value) 
=> convertEntity(entity) -> value}
+  }
+
+  // Visible for testing (QuotaEntity is nicer for assertions in test code)
+  private[metadata] def describeClientQuotasInternal(filters: 
Seq[ClientQuotaFilterComponent], strict: Boolean):
+      Map[QuotaEntity, Map[String, Double]] = inReadLock(lock) {
+
+    // Do some preliminary validation of the filter types and convert them to 
correct QuotaMatch type
+    val entityFilters = mutable.HashMap.empty[String, QuotaMatch]
+    filters.foreach { component =>
+      val entityType = component.entityType()
+      if (entityFilters.contains(entityType)) {
+        throw new InvalidRequestException(s"Duplicate ${entityType} filter 
component entity type")
+      } else if (entityType.isEmpty) {
+        throw new InvalidRequestException("Unexpected empty filter component 
entity type")
+      } else if (!ClientQuotaEntity.isValidEntityType(entityType)) {
+        throw new UnsupportedVersionException(s"Custom entity type 
${entityType} not supported")
+      }
+
+      // A present "match()" is an exact match on name, an absent "match()" is 
a match on the default entity,
+      // and a null "match()" is a match on the entity type
+      val entityMatch = if (component.`match`() != null && 
component.`match`().isPresent) {
+        ExactMatch(component.`match`().get())
+      } else if (component.`match`() != null) {
+        DefaultMatch
+      } else {
+        TypeMatch
+      }
+      entityFilters.put(entityType, entityMatch)
+    }
+
+    if (entityFilters.isEmpty) {
+      return Map.empty
+    }
+
+    // We do not allow IP filters to be combined with user or client filters
+    val matchingEntities: Set[QuotaEntity] = if 
(entityFilters.contains(ClientQuotaEntity.IP)) {
+      if (entityFilters.size > 1) {
+        throw new InvalidRequestException("Invalid entity filter component 
combination, IP filter component should " +
+          "not be used with user or clientId filter component.")
+      }
+      val ipMatch = entityFilters.get(ClientQuotaEntity.IP)
+      ipMatch.fold(Set.empty[QuotaEntity]) {
+          case ExactMatch(ip) => ipEntityIndex.getOrElse(SpecificIp(ip), 
Set.empty).toSet
+          case DefaultMatch => ipEntityIndex.getOrElse(DefaultIp, 
Set.empty).toSet
+          case TypeMatch => ipEntityIndex.values.flatten.toSet
+      }
+    } else if (entityFilters.contains(ClientQuotaEntity.USER) || 
entityFilters.contains(ClientQuotaEntity.CLIENT_ID)) {
+      // If either are present, check both user and client indexes
+      val userMatch = entityFilters.get(ClientQuotaEntity.USER)
+      val userIndexMatches = userMatch.fold(Set.empty[QuotaEntity]) {
+        case ExactMatch(user) => userEntityIndex.getOrElse(SpecificUser(user), 
Set.empty).toSet
+        case DefaultMatch => userEntityIndex.getOrElse(DefaultUser, 
Set.empty).toSet
+        case TypeMatch => userEntityIndex.values.flatten.toSet
+      }
+
+      val clientMatch = entityFilters.get(ClientQuotaEntity.CLIENT_ID)
+      val clientIndexMatches = clientMatch.fold(Set.empty[QuotaEntity]) {
+        case ExactMatch(clientId) => 
clientIdEntityIndex.getOrElse(SpecificClientId(clientId), Set.empty).toSet
+        case DefaultMatch => clientIdEntityIndex.getOrElse(DefaultClientId, 
Set.empty).toSet
+        case TypeMatch => clientIdEntityIndex.values.flatten.toSet
+      }
+
+      val candidateMatches = if (userMatch.isDefined && clientMatch.isDefined) 
{
+        userIndexMatches.intersect(clientIndexMatches)
+      } else if (userMatch.isDefined) {
+        userIndexMatches
+      } else {
+        clientIndexMatches
+      }
+
+      if (strict) {
+        // If in strict mode, we need to remove any matches with unspecified 
entity types. This only applies to results
+        // with more than one entity part (i.e., user and clientId)
+        candidateMatches.filter { quotaEntity =>
+          quotaEntity match {
+            case ExplicitUserExplicitClientIdEntity(_, _) => 
userMatch.isDefined && clientMatch.isDefined
+            case DefaultUserExplicitClientIdEntity(_) => userMatch.isDefined 
&& clientMatch.isDefined
+            case ExplicitUserDefaultClientIdEntity(_) => userMatch.isDefined 
&& clientMatch.isDefined
+            case DefaultUserDefaultClientIdEntity => userMatch.isDefined && 
clientMatch.isDefined
+            case _ => true
+          }
+        }
+      } else {
+        candidateMatches
+      }
+    } else {
+      // ClientQuotaEntity.isValidEntityType check above should prevent any 
unknown entity types
+      throw new IllegalStateException(s"Unexpected handling of 
${entityFilters} after filter validation")
+    }
+
+    val resultsMap: Map[QuotaEntity, Map[String, Double]] = 
matchingEntities.map {
+      quotaEntity => {
+        quotaCache.get(quotaEntity) match {
+          case Some(quotas) => quotaEntity -> quotas.toMap
+          case None => quotaEntity -> Map.empty[String, Double]
+        }
+      }
+    }.toMap
+
+    resultsMap
+  }
+
+  private def convertEntity(entity: QuotaEntity): ClientQuotaEntity = {
+    val entityMap = entity match {
+      case IpEntity(ip) => Map(ClientQuotaEntity.IP -> ip)
+      case DefaultIpEntity => Map(ClientQuotaEntity.IP -> null)
+      case UserEntity(user) => Map(ClientQuotaEntity.USER -> user)
+      case DefaultUserEntity => Map(ClientQuotaEntity.USER -> null)
+      case ClientIdEntity(clientId) => Map(ClientQuotaEntity.CLIENT_ID -> 
clientId)
+      case DefaultClientIdEntity => Map(ClientQuotaEntity.CLIENT_ID -> null)
+      case ExplicitUserExplicitClientIdEntity(user, clientId) =>
+        Map(ClientQuotaEntity.USER -> user, ClientQuotaEntity.CLIENT_ID -> 
clientId)
+      case ExplicitUserDefaultClientIdEntity(user) =>
+        Map(ClientQuotaEntity.USER -> user, ClientQuotaEntity.CLIENT_ID -> 
null)
+      case DefaultUserExplicitClientIdEntity(clientId) =>
+        Map(ClientQuotaEntity.USER -> null, ClientQuotaEntity.CLIENT_ID -> 
clientId)
+      case DefaultUserDefaultClientIdEntity =>
+        Map(ClientQuotaEntity.USER -> null, ClientQuotaEntity.CLIENT_ID -> 
null)
+    }
+    new ClientQuotaEntity(entityMap.asJava)
+  }
+
+  // Update the cache indexes
+  private def updateCacheIndex(quotaEntity: QuotaEntity,
+                               remove: Boolean)
+                              (quotaCacheIndex: QuotaCacheIndex,
+                               key: CacheIndexKey): Unit = {
+    if (remove) {
+      val needsCleanup = quotaCacheIndex.get(key) match {
+        case Some(quotaEntitySet) =>
+          quotaEntitySet.remove(quotaEntity)
+          quotaEntitySet.isEmpty
+        case None => false
+      }
+      if (needsCleanup) {
+        quotaCacheIndex.remove(key)
+      }
+    } else {
+      quotaCacheIndex.getOrElseUpdate(key, 
mutable.HashSet.empty).add(quotaEntity)
+    }
+  }
+
+  /**
+   * Update the quota cache with the given entity and quota key/value. If 
remove is set, the value is ignore and
+   * the quota entry is removed for the given key. No validation on quota keys 
is performed here, it is assumed
+   * that the caller has already done this.
+   *
+   * @param entity    A quota entity, either a specific entity or the default 
entity for the given type(s)
+   * @param key       The quota key
+   * @param value     The quota value
+   * @param remove    True if we should remove the given quota key from the 
entity's quota cache
+   */
+  def updateQuotaCache(entity: QuotaEntity, key: String, value: Double, 
remove: Boolean): Unit = inWriteLock(lock) {
+    val quotaValues = quotaCache.getOrElseUpdate(entity, mutable.HashMap.empty)
+    val removeFromIndex = if (remove) {
+      quotaValues.remove(key)
+      if (quotaValues.isEmpty) {
+        quotaCache.remove(entity)
+        true
+      } else {
+        false
+      }
+    } else {
+      quotaValues.put(key, value)
+      false
+    }
+
+    // Update the appropriate indexes with the entity
+    val updateCacheIndexPartial: (QuotaCacheIndex, CacheIndexKey) => Unit = 
updateCacheIndex(entity, removeFromIndex)
+    entity match {
+      case UserEntity(user) =>
+        updateCacheIndexPartial(userEntityIndex, SpecificUser(user))
+      case DefaultUserEntity =>
+        updateCacheIndexPartial(userEntityIndex, DefaultUser)
+
+      case ClientIdEntity(clientId) =>
+        updateCacheIndexPartial(clientIdEntityIndex, 
SpecificClientId(clientId))
+      case DefaultClientIdEntity =>
+        updateCacheIndexPartial(clientIdEntityIndex, DefaultClientId)
+
+      case ExplicitUserExplicitClientIdEntity(user, clientId) =>
+        updateCacheIndexPartial(userEntityIndex, SpecificUser(user))
+        updateCacheIndexPartial(clientIdEntityIndex, 
SpecificClientId(clientId))
+
+      case ExplicitUserDefaultClientIdEntity(user) =>
+        updateCacheIndexPartial(userEntityIndex, SpecificUser(user))
+        updateCacheIndexPartial(clientIdEntityIndex, DefaultClientId)
+
+      case DefaultUserExplicitClientIdEntity(clientId) =>
+        updateCacheIndexPartial(userEntityIndex, DefaultUser)
+        updateCacheIndexPartial(clientIdEntityIndex, 
SpecificClientId(clientId))
+
+      case DefaultUserDefaultClientIdEntity =>
+        updateCacheIndexPartial(userEntityIndex, DefaultUser)
+        updateCacheIndexPartial(clientIdEntityIndex, DefaultClientId)
+
+      case IpEntity(ip) =>
+        updateCacheIndexPartial(ipEntityIndex, SpecificIp(ip))
+      case DefaultIpEntity =>
+        updateCacheIndexPartial(ipEntityIndex, DefaultIp)
+    }
+  }
+}
diff --git 
a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala 
b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
new file mode 100644
index 0000000..76e0b54
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.network.ConnectionQuotas
+import kafka.server.ConfigEntityName
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.common.config.internals.QuotaConfigs
+import org.apache.kafka.common.metadata.QuotaRecord
+import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.common.utils.Sanitizer
+
+import java.net.{InetAddress, UnknownHostException}
+import scala.collection.mutable
+
+
+// A strict hierarchy of entities that we support
+sealed trait QuotaEntity
+case class IpEntity(ip: String) extends QuotaEntity
+case object DefaultIpEntity extends QuotaEntity
+case class UserEntity(user: String) extends QuotaEntity
+case object DefaultUserEntity extends QuotaEntity
+case class ClientIdEntity(clientId: String) extends QuotaEntity
+case object DefaultClientIdEntity extends QuotaEntity
+case class ExplicitUserExplicitClientIdEntity(user: String, clientId: String) 
extends QuotaEntity
+case class ExplicitUserDefaultClientIdEntity(user: String) extends QuotaEntity
+case class DefaultUserExplicitClientIdEntity(clientId: String) extends 
QuotaEntity
+case object DefaultUserDefaultClientIdEntity extends QuotaEntity
+
+/**
+ * Process quota metadata records as they appear in the metadata log and 
update quota managers and cache as necessary
+ */
+class ClientQuotaMetadataManager(private[metadata] val quotaManagers: 
QuotaManagers,
+                                 private[metadata] val connectionQuotas: 
ConnectionQuotas,
+                                 private[metadata] val quotaCache: 
ClientQuotaCache) extends Logging {
+
+  def handleQuotaRecord(quotaRecord: QuotaRecord): Unit = {
+    val entityMap = mutable.Map[String, String]()
+    quotaRecord.entity().forEach { entityData =>
+      entityMap.put(entityData.entityType(), entityData.entityName())
+    }
+
+    if (entityMap.contains(ClientQuotaEntity.IP)) {
+      // In the IP quota manager, None is used for default entity
+      val ipEntity = Option(entityMap(ClientQuotaEntity.IP)) match {
+        case Some(ip) => IpEntity(ip)
+        case None => DefaultIpEntity
+      }
+      handleIpQuota(ipEntity, quotaRecord)
+    } else if (entityMap.contains(ClientQuotaEntity.USER) || 
entityMap.contains(ClientQuotaEntity.CLIENT_ID)) {
+      // Need to handle null values for default entity name, so use 
"getOrElse" combined with "contains" checks
+      val userVal = entityMap.getOrElse(ClientQuotaEntity.USER, null)
+      val clientIdVal = entityMap.getOrElse(ClientQuotaEntity.CLIENT_ID, null)
+
+      // In User+Client quota managers, "<default>" is used for default 
entity, so we need to represent all possible
+      // combinations of values, defaults, and absent entities
+      val userClientEntity = if (entityMap.contains(ClientQuotaEntity.USER) && 
entityMap.contains(ClientQuotaEntity.CLIENT_ID)) {
+        if (userVal == null && clientIdVal == null) {
+          DefaultUserDefaultClientIdEntity
+        } else if (userVal == null) {
+          DefaultUserExplicitClientIdEntity(clientIdVal)
+        } else if (clientIdVal == null) {
+          ExplicitUserDefaultClientIdEntity(userVal)
+        } else {
+          ExplicitUserExplicitClientIdEntity(userVal, clientIdVal)
+        }
+      } else if (entityMap.contains(ClientQuotaEntity.USER)) {
+        if (userVal == null) {
+          DefaultUserEntity
+        } else {
+          UserEntity(userVal)
+        }
+      } else {
+        if (clientIdVal == null) {
+          DefaultClientIdEntity
+        } else {
+          ClientIdEntity(clientIdVal)
+        }
+      }
+      handleUserClientQuota(
+        userClientEntity,
+        quotaRecord
+      )
+    } else {
+      warn(s"Ignoring unsupported quota entity ${quotaRecord.entity()}")
+    }
+  }
+
+  def handleIpQuota(ipEntity: QuotaEntity, quotaRecord: QuotaRecord): Unit = {
+    val inetAddress = ipEntity match {
+      case IpEntity(ip) =>
+        try {
+          Some(InetAddress.getByName(ip))
+        } catch {
+          case _: UnknownHostException => throw new 
IllegalArgumentException(s"Unable to resolve address $ip")
+        }
+      case DefaultIpEntity => None
+      case _ => throw new IllegalStateException("Should only handle IP quota 
entities here")
+    }
+
+    // The connection quota only understands the connection rate limit
+    if (quotaRecord.key() != QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG) {
+      warn(s"Ignoring unexpected quota key ${quotaRecord.key()} for entity 
$ipEntity")
+      return
+    }
+
+    // Update the cache
+    quotaCache.updateQuotaCache(ipEntity, quotaRecord.key, quotaRecord.value, 
quotaRecord.remove)
+
+    // Convert the value to an appropriate Option for the quota manager
+    val newValue = if (quotaRecord.remove()) {
+      None
+    } else {
+      Some(quotaRecord.value).map(_.toInt)
+    }
+    connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue)
+  }
+
+  def handleUserClientQuota(quotaEntity: QuotaEntity, quotaRecord: 
QuotaRecord): Unit = {
+    val manager = quotaRecord.key() match {
+      case QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG => 
quotaManagers.fetch
+      case QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG => 
quotaManagers.produce
+      case QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG => 
quotaManagers.request
+      case QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG => 
quotaManagers.controllerMutation
+      case _ =>
+        warn(s"Ignoring unexpected quota key ${quotaRecord.key()} for entity 
$quotaEntity")
+        return
+    }
+
+    // Convert entity into Options with sanitized values for QuotaManagers
+    val (sanitizedUser, sanitizedClientId) = quotaEntity match {
+      case UserEntity(user) => (Some(Sanitizer.sanitize(user)), None)
+      case DefaultUserEntity => (Some(ConfigEntityName.Default), None)
+      case ClientIdEntity(clientId) => (None, 
Some(Sanitizer.sanitize(clientId)))
+      case DefaultClientIdEntity => (None, Some(ConfigEntityName.Default))
+      case ExplicitUserExplicitClientIdEntity(user, clientId) => 
(Some(Sanitizer.sanitize(user)), Some(Sanitizer.sanitize(clientId)))
+      case ExplicitUserDefaultClientIdEntity(user) => 
(Some(Sanitizer.sanitize(user)), Some(ConfigEntityName.Default))
+      case DefaultUserExplicitClientIdEntity(clientId) => 
(Some(ConfigEntityName.Default), Some(Sanitizer.sanitize(clientId)))
+      case DefaultUserDefaultClientIdEntity => 
(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
+      case IpEntity(_) | DefaultIpEntity => throw new 
IllegalStateException("Should not see IP quota entities here")
+    }
+
+    val quotaValue = if (quotaRecord.remove()) {
+      None
+    } else {
+      Some(new Quota(quotaRecord.value(), true))
+    }
+
+    manager.updateQuota(
+      sanitizedUser = sanitizedUser,
+      clientId = sanitizedClientId.map(Sanitizer.desanitize),
+      sanitizedClientId = sanitizedClientId,
+      quota = quotaValue)
+
+    quotaCache.updateQuotaCache(quotaEntity, quotaRecord.key, 
quotaRecord.value, quotaRecord.remove)
+  }
+}
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/ClientQuotaMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/ClientQuotaMetadataManagerTest.scala
new file mode 100644
index 0000000..02982f4
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/ClientQuotaMetadataManagerTest.scala
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.network.ConnectionQuotas
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.{ConfigEntityName, KafkaConfig, QuotaFactory}
+import kafka.utils.{MockTime, TestUtils}
+import org.apache.kafka.common.config.internals.QuotaConfigs
+import org.apache.kafka.common.errors.{InvalidRequestException, 
UnsupportedVersionException}
+import org.apache.kafka.common.metadata.QuotaRecord
+import org.apache.kafka.common.metrics.{Metrics, Quota}
+import org.apache.kafka.common.quota.{ClientQuotaEntity, 
ClientQuotaFilterComponent}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.mockito.ArgumentMatchers.{any, eq => _eq}
+import org.mockito.Mockito._
+
+import java.net.InetAddress
+import java.util.Properties
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+class ClientQuotaMetadataManagerTest {
+
+  var manager: ClientQuotaMetadataManager = _
+  var cache: ClientQuotaCache = _
+
+  @BeforeEach
+  def setup(): Unit = {
+    val configs = TestUtils.createBrokerConfigs(1, TestUtils.MockZkConnect)
+      .map(KafkaConfig.fromProps(_, new Properties()))
+
+    val time = new MockTime
+    val metrics = new Metrics
+    val quotaManagers = QuotaFactory.instantiate(configs.head, metrics, time, 
"quota-metadata-processor-test")
+    val spiedQuotaManagers = QuotaManagers(
+      fetch = spy(quotaManagers.fetch),
+      produce = spy(quotaManagers.produce),
+      request = spy(quotaManagers.request),
+      controllerMutation = spy(quotaManagers.controllerMutation),
+      leader = quotaManagers.leader,
+      follower = quotaManagers.follower,
+      alterLogDirs = quotaManagers.alterLogDirs,
+      clientQuotaCallback = quotaManagers.clientQuotaCallback
+    )
+    val connectionQuotas = mock(classOf[ConnectionQuotas])
+    cache = new ClientQuotaCache()
+    manager = new ClientQuotaMetadataManager(spiedQuotaManagers, 
connectionQuotas, cache)
+  }
+
+  @Test
+  def testDescribeStrictMatch(): Unit = {
+    setupAndVerify(manager, { case (entity, _) =>
+      val components = mutable.ListBuffer[ClientQuotaFilterComponent]()
+      entityToFilter(entity, components)
+      val results = cache.describeClientQuotas(components.toSeq, strict=true)
+      assertEquals(1, results.size, s"Should only match one quota for 
${entity}")
+    })
+
+    val nonMatching = List(
+      userClientEntity("user-1", "client-id-2"),
+      userClientEntity("user-3", "client-id-1"),
+      userClientEntity("user-2", null),
+      userEntity("user-4"),
+      userClientEntity(null, "client-id-2"),
+      clientEntity("client-id-1"),
+      clientEntity("client-id-3")
+    )
+
+    nonMatching.foreach( entity => {
+      val components = mutable.ListBuffer[ClientQuotaFilterComponent]()
+      entityToFilter(entity, components)
+      val results = cache.describeClientQuotas(components.toSeq, strict=true)
+      assertEquals(0, results.size)
+    })
+  }
+
+  @Test
+  def testDescribeNonStrictMatch(): Unit = {
+    setupAndVerify(manager, { case (_, _) => })
+
+    // Match open-ended existing user.
+    val components = mutable.ListBuffer[ClientQuotaFilterComponent]()
+    entityToFilter(userEntity("user-1"), components)
+    var results = cache.describeClientQuotasInternal(components.toSeq, 
strict=false)
+    assertEquals(3, results.size)
+    assertEquals(3, results.keySet.count(quotaEntity => quotaEntity match {
+      case UserEntity(user) => user.equals("user-1")
+      case ExplicitUserDefaultClientIdEntity(user) => user.equals("user-1")
+      case ExplicitUserExplicitClientIdEntity(user, _) => user.equals("user-1")
+      case _ => false
+    }))
+
+    results = cache.describeClientQuotasInternal(components.toSeq, strict=true)
+    assertEquals(1, results.size)
+
+    // Match open-ended non-existent user.
+    components.clear()
+    entityToFilter(userEntity("unknown"), components)
+    results = cache.describeClientQuotasInternal(components.toSeq, 
strict=false)
+    assertEquals(0, results.size)
+
+    // Match open-ended existing client ID.
+    components.clear()
+    entityToFilter(clientEntity("client-id-2"), components)
+    results = cache.describeClientQuotasInternal(components.toSeq, 
strict=false)
+    assertEquals(2, results.size)
+    assertEquals(2, results.keySet.count(quotaEntity => quotaEntity match {
+      case ClientIdEntity(clientId) => clientId.equals("client-id-2")
+      case DefaultUserExplicitClientIdEntity(clientId) => 
clientId.equals("client-id-2")
+      case ExplicitUserExplicitClientIdEntity(_, clientId) => 
clientId.equals("client-id-2")
+      case _ => false
+    }))
+
+    // Match open-ended default user.
+    results = cache.describeClientQuotasInternal(
+      Seq(ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.USER)), 
strict=false)
+    assertEquals(3, results.size)
+    assertEquals(3, results.keySet.count(quotaEntity => quotaEntity match {
+      case DefaultUserEntity | DefaultUserExplicitClientIdEntity(_) | 
DefaultUserDefaultClientIdEntity => true
+      case _ => false
+    }))
+
+    // Match open-ended default client.
+    results = cache.describeClientQuotasInternal(
+      
Seq(ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.CLIENT_ID)), 
strict=false)
+    assertEquals(3, results.size)
+    assertEquals(3, results.keySet.count(quotaEntity => quotaEntity match {
+      case DefaultClientIdEntity | ExplicitUserDefaultClientIdEntity(_) | 
DefaultUserDefaultClientIdEntity => true
+      case _ => false
+    }))
+  }
+
+  @Test
+  def testDescribeFilterOnTypes(): Unit = {
+    setupAndVerify(manager, { case (_, _) => })
+
+    var results = cache.describeClientQuotasInternal(
+      Seq(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)), 
strict=false)
+    assertEquals(11, results.size)
+    assertEquals(11, results.keySet.count(quotaEntity => quotaEntity match {
+      case UserEntity(_) | DefaultUserEntity | 
ExplicitUserExplicitClientIdEntity(_, _) | ExplicitUserDefaultClientIdEntity(_) 
|
+           DefaultUserExplicitClientIdEntity(_) | 
DefaultUserDefaultClientIdEntity => true
+      case _ => false
+    }))
+
+    results = cache.describeClientQuotasInternal(
+      
Seq(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)), 
strict=false)
+    assertEquals(8, results.size)
+    assertEquals(8, results.keySet.count(quotaEntity => quotaEntity match {
+      case ClientIdEntity(_) | DefaultClientIdEntity | 
ExplicitUserExplicitClientIdEntity(_, _) | ExplicitUserDefaultClientIdEntity(_) 
|
+           DefaultUserExplicitClientIdEntity(_) | 
DefaultUserDefaultClientIdEntity => true
+      case _ => false
+    }))
+
+    results = cache.describeClientQuotasInternal(
+      Seq(
+        ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER),
+        ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)
+      ), strict=true)
+    assertEquals(7, results.size)
+    assertEquals(7, results.keySet.count(quotaEntity => quotaEntity match {
+      case ExplicitUserExplicitClientIdEntity(_, _) | 
ExplicitUserDefaultClientIdEntity(_) |
+           DefaultUserExplicitClientIdEntity(_) | 
DefaultUserDefaultClientIdEntity => true
+      case _ => false
+    }))
+  }
+
+  @Test
+  def testEntityWithDefaultName(): Unit = {
+    addQuotaRecord(manager, clientEntity(ConfigEntityName.Default), 
(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0))
+    addQuotaRecord(manager, clientEntity(null), 
(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 30000.0))
+
+    val components = mutable.ListBuffer[ClientQuotaFilterComponent]()
+    entityToFilter(clientEntity(ConfigEntityName.Default), components)
+    var results = cache.describeClientQuotas(components.toSeq, strict=true)
+    assertEquals(1, results.size)
+
+    components.clear()
+    entityToFilter(clientEntity(null), components)
+    results = cache.describeClientQuotas(components.toSeq, strict=true)
+    assertEquals(1, results.size)
+  }
+
+  @Test
+  def testQuotaRemoval(): Unit = {
+    val entity = userClientEntity("user", "client-id")
+    addQuotaRecord(manager, entity, 
(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 10000.0))
+    addQuotaRecord(manager, entity, 
(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0))
+    var quotas = describeEntity(entity)
+    assertEquals(2, quotas.size)
+    assertEquals(10000.0, 
quotas(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG), 1e-6)
+
+    addQuotaRecord(manager, entity, 
(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 10001.0))
+    quotas = describeEntity(entity)
+    assertEquals(2, quotas.size)
+    assertEquals(10001.0, 
quotas(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG), 1e-6)
+
+    addQuotaRemovalRecord(manager, entity, 
QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG)
+    quotas = describeEntity(entity)
+    assertEquals(1, quotas.size)
+    
assertFalse(quotas.contains(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG))
+
+    addQuotaRemovalRecord(manager, entity, 
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG)
+    quotas = describeEntity(entity)
+    assertEquals(0, quotas.size)
+
+    // Removing non-existent quota should not do anything
+    addQuotaRemovalRecord(manager, entity, 
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG)
+    quotas = describeEntity(entity)
+    assertEquals(0, quotas.size)
+  }
+
+  @Test
+  def testDescribeClientQuotasInvalidFilterCombination(): Unit = {
+    val ipFilterComponent = 
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)
+    val userFilterComponent = 
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)
+    val clientIdFilterComponent = 
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)
+    val expectedExceptionMessage = "Invalid entity filter component 
combination"
+    assertThrows(classOf[InvalidRequestException],
+      () => cache.describeClientQuotas(Seq(ipFilterComponent, 
userFilterComponent), strict=false),
+      () => expectedExceptionMessage)
+    assertThrows(classOf[InvalidRequestException],
+      () => cache.describeClientQuotas(Seq(ipFilterComponent, 
clientIdFilterComponent), strict=false),
+      () => expectedExceptionMessage)
+    assertThrows(classOf[InvalidRequestException],
+      () => cache.describeClientQuotas(Seq(ipFilterComponent, 
ipFilterComponent), strict = false),
+      () => expectedExceptionMessage)
+    assertThrows(classOf[InvalidRequestException],
+      () => cache.describeClientQuotas(Seq(userFilterComponent, 
userFilterComponent), strict=false),
+      () => expectedExceptionMessage)
+  }
+
+  @Test
+  def testDescribeEmptyFilter(): Unit = {
+    var results = cache.describeClientQuotas(Seq.empty, strict=false)
+    assertEquals(0, results.size)
+
+    results = cache.describeClientQuotas(Seq.empty, strict=true)
+    assertEquals(0, results.size)
+  }
+
+  @Test
+  def testDescribeUnsupportedEntityType(): Unit = {
+    assertThrows(classOf[UnsupportedVersionException],
+      () => 
cache.describeClientQuotas(Seq(ClientQuotaFilterComponent.ofEntityType("other")),
 strict=false))
+  }
+
+  @Test
+  def testDescribeMissingEntityType(): Unit = {
+    assertThrows(classOf[InvalidRequestException],
+      () => 
cache.describeClientQuotas(Seq(ClientQuotaFilterComponent.ofEntity("", 
"name")), strict = false))
+  }
+
+  @Test
+  def testQuotaManagers(): Unit = {
+    val entity = userClientEntity("user", "client")
+    addQuotaRecord(manager, entity, 
(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 100.0))
+    verify(manager.quotaManagers.fetch, times(1)).updateQuota(
+      _eq(Some("user")),
+      _eq(Some("client")),
+      _eq(Some("client")),
+      any(classOf[Option[Quota]])
+    )
+
+    addQuotaRecord(manager, entity, 
(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 100.0))
+    verify(manager.quotaManagers.produce, times(1)).updateQuota(
+      _eq(Some("user")),
+      _eq(Some("client")),
+      _eq(Some("client")),
+      any(classOf[Option[Quota]])
+    )
+
+    addQuotaRecord(manager, entity, 
(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 100.0))
+    verify(manager.quotaManagers.request, times(1)).updateQuota(
+      _eq(Some("user")),
+      _eq(Some("client")),
+      _eq(Some("client")),
+      any(classOf[Option[Quota]])
+    )
+
+    addQuotaRecord(manager, entity, 
(QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG, 100.0))
+    verify(manager.quotaManagers.controllerMutation, times(1)).updateQuota(
+      _eq(Some("user")),
+      _eq(Some("client")),
+      _eq(Some("client")),
+      any(classOf[Option[Quota]])
+    )
+
+    addQuotaRemovalRecord(manager, entity, 
QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG)
+    verify(manager.quotaManagers.controllerMutation, times(1)).updateQuota(
+      _eq(Some("user")),
+      _eq(Some("client")),
+      _eq(Some("client")),
+      _eq(None)
+    )
+  }
+
+  @Test
+  def testIpQuota(): Unit = {
+    val defaultIp = ipEntity(null)
+    val knownIp = ipEntity("1.2.3.4")
+
+    addQuotaRecord(manager, defaultIp, 
(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG, 100.0))
+    addQuotaRecord(manager, knownIp, 
(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG, 99.0))
+
+    verify(manager.connectionQuotas, times(2)).updateIpConnectionRateQuota(
+      any(classOf[Option[InetAddress]]),
+      any(classOf[Option[Int]])
+    )
+
+    var quotas = describeEntity(defaultIp)
+    assertEquals(1, quotas.size)
+
+    quotas = describeEntity(knownIp)
+    assertEquals(1, quotas.size)
+
+    val results = 
cache.describeClientQuotas(Seq(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)),
 strict=false)
+    assertEquals(2, results.size)
+
+    reset(manager.connectionQuotas)
+    addQuotaRecord(manager, knownIp, 
(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG, 98.0))
+    verify(manager.connectionQuotas, times(1)).updateIpConnectionRateQuota(
+      any(classOf[Option[InetAddress]]),
+      _eq(Some(98))
+    )
+
+    reset(manager.connectionQuotas)
+    addQuotaRemovalRecord(manager, knownIp, 
QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG)
+    verify(manager.connectionQuotas, times(1)).updateIpConnectionRateQuota(
+      any(classOf[Option[InetAddress]]),
+      _eq(None)
+    )
+  }
+
+  @Test
+  def testIpQuotaUnknownKey(): Unit = {
+    val defaultIp = ipEntity(null)
+    addQuotaRecord(manager, defaultIp, ("not-an-ip-quota-key", 100.0))
+    verify(manager.connectionQuotas, times(0)).updateIpConnectionRateQuota(
+      any(classOf[Option[InetAddress]]),
+      _eq(Some(100))
+    )
+
+    assertEquals(0, describeEntity(defaultIp).size)
+  }
+
+  @Test
+  def testUserQuotaUnknownKey(): Unit = {
+    val defaultUser = userEntity(null)
+    addQuotaRecord(manager, defaultUser, ("not-a-user-quota-key", 100.0))
+    assertEquals(0, describeEntity(defaultUser).size)
+  }
+
+  def setupAndVerify(manager: ClientQuotaMetadataManager,
+                     verifier: (List[QuotaRecord.EntityData], (String, 
Double)) => Unit ): Unit = {
+    val toVerify = List(
+      (userClientEntity("user-1", "client-id-1"), 50.50),
+      (userClientEntity("user-2", "client-id-1"), 51.51),
+      (userClientEntity("user-3", "client-id-2"), 52.52),
+      (userClientEntity(null, "client-id-1"), 53.53),
+      (userClientEntity("user-1", null), 54.54),
+      (userClientEntity("user-3", null), 55.55),
+      (userEntity("user-1"), 56.56),
+      (userEntity("user-2"), 57.57),
+      (userEntity("user-3"), 58.58),
+      (userEntity(null), 59.59),
+      (clientEntity("client-id-2"), 60.60),
+      (userClientEntity(null, null), 61.61)
+    )
+
+    toVerify.foreach {
+      case (entity, value) => addQuotaRecord(manager, entity, 
(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, value))
+    }
+
+    toVerify.foreach {
+      case (entity, value) => verifier.apply(entity, 
(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, value))
+    }
+  }
+
+  def describeEntity(entity: List[QuotaRecord.EntityData]): Map[String, 
Double] = {
+    val components = mutable.ListBuffer[ClientQuotaFilterComponent]()
+    entityToFilter(entity, components)
+    val results = cache.describeClientQuotas(components.toSeq, strict=true)
+    if (results.isEmpty) {
+      Map()
+    } else if (results.size == 1) {
+      results.head._2
+    } else {
+      throw new AssertionError("Matched more than one entity with strict=true 
describe filter")
+    }
+  }
+
+  def addQuotaRecord(manager: ClientQuotaMetadataManager, entity: 
List[QuotaRecord.EntityData], quota: (String, Double)): Unit = {
+    manager.handleQuotaRecord(new QuotaRecord()
+      .setEntity(entity.asJava)
+      .setKey(quota._1)
+      .setValue(quota._2))
+  }
+
+  def addQuotaRemovalRecord(manager: ClientQuotaMetadataManager, entity: 
List[QuotaRecord.EntityData], quota: String): Unit = {
+    manager.handleQuotaRecord(new QuotaRecord()
+      .setEntity(entity.asJava)
+      .setKey(quota)
+      .setRemove(true))
+  }
+
+  def entityToFilter(entity: List[QuotaRecord.EntityData], components: 
mutable.ListBuffer[ClientQuotaFilterComponent]): Unit = {
+    entity.foreach(entityData => {
+      if (entityData.entityName() == null) {
+        
components.append(ClientQuotaFilterComponent.ofDefaultEntity(entityData.entityType()))
+      } else {
+        
components.append(ClientQuotaFilterComponent.ofEntity(entityData.entityType(), 
entityData.entityName()))
+      }
+    })
+  }
+
+  def clientEntity(clientId: String): List[QuotaRecord.EntityData] = {
+    List(new 
QuotaRecord.EntityData().setEntityType(ClientQuotaEntity.CLIENT_ID).setEntityName(clientId))
+  }
+
+  def userEntity(user: String): List[QuotaRecord.EntityData] = {
+    List(new 
QuotaRecord.EntityData().setEntityType(ClientQuotaEntity.USER).setEntityName(user))
+  }
+
+  def userClientEntity(user: String, clientId: String): 
List[QuotaRecord.EntityData] = {
+    List(
+      new 
QuotaRecord.EntityData().setEntityType(ClientQuotaEntity.USER).setEntityName(user),
+      new 
QuotaRecord.EntityData().setEntityType(ClientQuotaEntity.CLIENT_ID).setEntityName(clientId)
+    )
+  }
+
+  def ipEntity(ip: String): List[QuotaRecord.EntityData] = {
+    List(new 
QuotaRecord.EntityData().setEntityType(ClientQuotaEntity.IP).setEntityName(ip))
+  }
+}

Reply via email to