This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1841c07d4af KAFKA-17449 Move Quota classes to server-common module 
(#17060)
1841c07d4af is described below

commit 1841c07d4af1daa55f5f332e7c844738c12e7f7c
Author: Mickael Maison <[email protected]>
AuthorDate: Sat Aug 31 06:41:34 2024 +0200

    KAFKA-17449 Move Quota classes to server-common module (#17060)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 checkstyle/import-control.xml                      |   1 +
 .../java/kafka/log/remote/RemoteLogManager.java    |   6 +-
 .../kafka/log/remote/quota/RLMQuotaManager.java    |   4 +-
 .../main/scala/kafka/network/SocketServer.scala    |   1 +
 .../scala/kafka/server/ClientQuotaManager.scala    |   4 +-
 .../kafka/server/ClientRequestQuotaManager.scala   |  11 +-
 .../server/ControllerMutationQuotaManager.scala    |   8 +-
 .../src/main/scala/kafka/server/QuotaFactory.scala |  39 +-----
 .../kafka/server/ReplicationQuotaManager.scala     |   4 +-
 core/src/main/scala/kafka/utils/QuotaUtils.scala   |  75 -----------
 .../log/remote/quota/RLMQuotaManagerTest.java      |   5 +-
 .../integration/kafka/api/BaseQuotaTest.scala      |  17 +--
 .../kafka/api/CustomQuotaCallbackTest.scala        |   8 +-
 .../kafka/api/PlaintextConsumerTest.scala          |  21 +--
 .../unit/kafka/server/ClientQuotaManagerTest.scala |  22 ++--
 .../server/ClientRequestQuotaManagerTest.scala     |   4 +-
 .../ControllerMutationQuotaManagerTest.scala       |   4 +-
 .../kafka/server/ControllerMutationQuotaTest.scala |   3 +-
 .../kafka/server/ReplicationQuotaManagerTest.scala |  10 +-
 .../unit/kafka/server/ReplicationQuotasTest.scala  |   4 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  19 +--
 .../server/ThrottledChannelExpirationTest.scala    |   4 +-
 .../scala/unit/kafka/utils/QuotaUtilsTest.scala    | 134 -------------------
 .../org/apache/kafka/server/quota/QuotaType.java   |  50 +++++++
 .../org/apache/kafka/server/quota/QuotaUtils.java  |  79 ++++++++++++
 .../apache/kafka/server/quota/QuotaUtilsTest.java  | 143 +++++++++++++++++++++
 .../tools/other/ReplicationQuotasTestRig.java      |   6 +-
 27 files changed, 363 insertions(+), 323 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 0842f9189c2..285163816f2 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -293,6 +293,7 @@
     <allow pkg="org.apache.kafka.server.common" />
     <allow pkg="org.apache.kafka.server.log.remote.metadata.storage" />
     <allow pkg="org.apache.kafka.server.log.remote.storage" />
+    <allow pkg="org.apache.kafka.server.quota" />
     <allow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.clients.admin" />
     <allow pkg="org.apache.kafka.clients.producer" />
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 3f4da991fe6..7606f7f7f67 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -22,7 +22,6 @@ import kafka.log.UnifiedLog;
 import kafka.log.remote.quota.RLMQuotaManager;
 import kafka.log.remote.quota.RLMQuotaManagerConfig;
 import kafka.log.remote.quota.RLMQuotaMetrics;
-import kafka.server.QuotaType;
 import kafka.server.StopPartition;
 
 import org.apache.kafka.common.KafkaException;
@@ -64,6 +63,7 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
 import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.quota.QuotaType;
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
 import org.apache.kafka.storage.internals.log.AbortedTxn;
@@ -291,12 +291,12 @@ public class RemoteLogManager implements Closeable {
     }
 
     RLMQuotaManager createRLMCopyQuotaManager() {
-        return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, 
QuotaType.RLMCopy$.MODULE$,
+        return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, 
QuotaType.RLM_COPY,
           "Tracking copy byte-rate for Remote Log Manager", time);
     }
 
     RLMQuotaManager createRLMFetchQuotaManager() {
-        return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig), 
metrics, QuotaType.RLMFetch$.MODULE$,
+        return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig), 
metrics, QuotaType.RLM_FETCH,
           "Tracking fetch byte-rate for Remote Log Manager", time);
     }
 
diff --git a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java 
b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
index 0f695a21e63..eb677c7fee4 100644
--- a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
+++ b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
@@ -16,9 +16,7 @@
  */
 package kafka.log.remote.quota;
 
-import kafka.server.QuotaType;
 import kafka.server.SensorAccess;
-import kafka.utils.QuotaUtils;
 
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.KafkaMetric;
@@ -29,6 +27,8 @@ import 
org.apache.kafka.common.metrics.QuotaViolationException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.SimpleRate;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index d8545105689..a20d52f33f1 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -48,6 +48,7 @@ import org.apache.kafka.network.{ConnectionQuotaEntity, 
ConnectionThrottledExcep
 import org.apache.kafka.security.CredentialProvider
 import org.apache.kafka.server.config.QuotaConfigs
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.server.quota.QuotaUtils
 import org.apache.kafka.server.util.FutureUtils
 import org.slf4j.event.Level
 
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index bd12a51d3f3..144b9d2cfce 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.{ConcurrentHashMap, DelayQueue, 
TimeUnit}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import kafka.network.RequestChannel
 import kafka.server.ClientQuotaManager._
-import kafka.utils.{Logging, QuotaUtils}
+import kafka.utils.Logging
 import org.apache.kafka.common.{Cluster, MetricName}
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.Metrics
@@ -29,7 +29,7 @@ import org.apache.kafka.common.metrics.stats.{Avg, 
CumulativeSum, Rate}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Sanitizer, Time}
 import org.apache.kafka.server.config.{ClientQuotaManagerConfig, 
ZooKeeperInternals}
-import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, 
ClientQuotaType, ThrottleCallback, ThrottledChannel}
+import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, 
ClientQuotaType, QuotaType, QuotaUtils, ThrottleCallback, ThrottledChannel}
 import org.apache.kafka.server.util.ShutdownableThread
 import org.apache.kafka.network.Session
 
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index 437d6c371a2..248a7cb5125 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -18,13 +18,12 @@ package kafka.server
 
 import java.util.concurrent.TimeUnit
 import kafka.network.RequestChannel
-import kafka.utils.QuotaUtils
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.Rate
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.config.ClientQuotaManagerConfig
-import org.apache.kafka.server.quota.ClientQuotaCallback
+import org.apache.kafka.server.quota.{ClientQuotaCallback, QuotaType, 
QuotaUtils}
 
 import scala.jdk.CollectionConverters._
 
@@ -34,7 +33,7 @@ object ClientRequestQuotaManager {
   // create once.
   private val DefaultInactiveExemptSensorExpirationTimeSeconds: Long = 
Long.MaxValue
 
-  private val ExemptSensorName = "exempt-" + QuotaType.Request
+  private val ExemptSensorName = "exempt-" + QuotaType.REQUEST
 }
 
 class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
@@ -42,11 +41,11 @@ class ClientRequestQuotaManager(private val config: 
ClientQuotaManagerConfig,
                                 private val time: Time,
                                 private val threadNamePrefix: String,
                                 private val quotaCallback: 
Option[ClientQuotaCallback])
-    extends ClientQuotaManager(config, metrics, QuotaType.Request, time, 
threadNamePrefix, quotaCallback) {
+    extends ClientQuotaManager(config, metrics, QuotaType.REQUEST, time, 
threadNamePrefix, quotaCallback) {
 
   private val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
   private val exemptMetricName = metrics.metricName("exempt-request-time",
-    QuotaType.Request.toString, "Tracking exempt-request-time utilization 
percentage")
+    QuotaType.REQUEST.toString, "Tracking exempt-request-time utilization 
percentage")
 
   val exemptSensor: Sensor = 
getOrCreateSensor(ClientRequestQuotaManager.ExemptSensorName,
     ClientRequestQuotaManager.DefaultInactiveExemptSensorExpirationTimeSeconds,
@@ -85,7 +84,7 @@ class ClientRequestQuotaManager(private val config: 
ClientQuotaManagerConfig,
   }
 
   override protected def clientQuotaMetricName(quotaMetricTags: Map[String, 
String]): MetricName = {
-    metrics.metricName("request-time", QuotaType.Request.toString,
+    metrics.metricName("request-time", QuotaType.REQUEST.toString,
       "Tracking request-time per user/client-id",
       quotaMetricTags.asJava)
   }
diff --git 
a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala 
b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
index e1d38eb65ee..1a644e30d9a 100644
--- a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.metrics.stats.TokenBucket
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.network.Session
-import org.apache.kafka.server.quota.ClientQuotaCallback
+import org.apache.kafka.server.quota.{ClientQuotaCallback, QuotaType}
 import org.apache.kafka.server.config.ClientQuotaManagerConfig
 
 import scala.jdk.CollectionConverters._
@@ -166,16 +166,16 @@ class ControllerMutationQuotaManager(private val config: 
ClientQuotaManagerConfi
                                      private val time: Time,
                                      private val threadNamePrefix: String,
                                      private val quotaCallback: 
Option[ClientQuotaCallback])
-    extends ClientQuotaManager(config, metrics, QuotaType.ControllerMutation, 
time, threadNamePrefix, quotaCallback) {
+    extends ClientQuotaManager(config, metrics, QuotaType.CONTROLLER_MUTATION, 
time, threadNamePrefix, quotaCallback) {
 
   override protected def clientQuotaMetricName(quotaMetricTags: Map[String, 
String]): MetricName = {
-    metrics.metricName("tokens", QuotaType.ControllerMutation.toString,
+    metrics.metricName("tokens", QuotaType.CONTROLLER_MUTATION.toString,
       "Tracking remaining tokens in the token bucket per user/client-id",
       quotaMetricTags.asJava)
   }
 
   private def clientRateMetricName(quotaMetricTags: Map[String, String]): 
MetricName = {
-    metrics.metricName("mutation-rate", QuotaType.ControllerMutation.toString,
+    metrics.metricName("mutation-rate", QuotaType.CONTROLLER_MUTATION.toString,
       "Tracking mutation-rate per user/client-id",
       quotaMetricTags.asJava)
   }
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala 
b/core/src/main/scala/kafka/server/QuotaFactory.scala
index f613463774a..c79ae37fc95 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -16,38 +16,13 @@
   */
 package kafka.server
 
-import kafka.server.QuotaType._
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.server.quota.ClientQuotaCallback
+import org.apache.kafka.server.quota.{ClientQuotaCallback, QuotaType}
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.config.{ClientQuotaManagerConfig, 
ReplicationQuotaManagerConfig, QuotaConfigs}
-import org.apache.kafka.server.quota.ClientQuotaType
+import org.apache.kafka.server.config.{ClientQuotaManagerConfig, QuotaConfigs, 
ReplicationQuotaManagerConfig}
 
-object QuotaType  {
-  case object Fetch extends QuotaType
-  case object Produce extends QuotaType
-  case object Request extends QuotaType
-  case object ControllerMutation extends QuotaType
-  case object LeaderReplication extends QuotaType
-  case object FollowerReplication extends QuotaType
-  case object AlterLogDirsReplication extends QuotaType
-  case object RLMCopy extends QuotaType
-  case object RLMFetch extends QuotaType
-
-  def toClientQuotaType(quotaType: QuotaType): ClientQuotaType = {
-    quotaType match {
-      case QuotaType.Fetch => ClientQuotaType.FETCH
-      case QuotaType.Produce => ClientQuotaType.PRODUCE
-      case QuotaType.Request => ClientQuotaType.REQUEST
-      case QuotaType.ControllerMutation => ClientQuotaType.CONTROLLER_MUTATION
-      case _ => throw new IllegalArgumentException(s"Not a client quota type: 
$quotaType")
-    }
-  }
-}
-
-sealed trait QuotaType
 
 object QuotaFactory extends Logging {
 
@@ -79,14 +54,14 @@ object QuotaFactory extends Logging {
     val clientQuotaCallback = 
Option(cfg.getConfiguredInstance(QuotaConfigs.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG,
       classOf[ClientQuotaCallback]))
     QuotaManagers(
-      new ClientQuotaManager(clientConfig(cfg), metrics, Fetch, time, 
threadNamePrefix, clientQuotaCallback),
-      new ClientQuotaManager(clientConfig(cfg), metrics, Produce, time, 
threadNamePrefix, clientQuotaCallback),
+      new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.FETCH, 
time, threadNamePrefix, clientQuotaCallback),
+      new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.PRODUCE, 
time, threadNamePrefix, clientQuotaCallback),
       new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, 
threadNamePrefix, clientQuotaCallback),
       new ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), 
metrics, time,
         threadNamePrefix, clientQuotaCallback),
-      new ReplicationQuotaManager(replicationConfig(cfg), metrics, 
LeaderReplication, time),
-      new ReplicationQuotaManager(replicationConfig(cfg), metrics, 
FollowerReplication, time),
-      new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, 
AlterLogDirsReplication, time),
+      new ReplicationQuotaManager(replicationConfig(cfg), metrics, 
QuotaType.LEADER_REPLICATION, time),
+      new ReplicationQuotaManager(replicationConfig(cfg), metrics, 
QuotaType.FOLLOWER_REPLICATION, time),
+      new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, 
QuotaType.ALTER_LOG_DIRS_REPLICATION, time),
       clientQuotaCallback
     )
   }
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala 
b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index 43f508f4a36..ce50b99ff54 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -18,18 +18,16 @@ package kafka.server
 
 import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 import java.util.concurrent.locks.ReentrantReadWriteLock
-
 import scala.collection.Seq
-
 import kafka.server.Constants._
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.metrics._
-
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.stats.SimpleRate
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.config.ReplicationQuotaManagerConfig
+import org.apache.kafka.server.quota.QuotaType
 
 trait ReplicaQuota {
   def record(value: Long): Unit
diff --git a/core/src/main/scala/kafka/utils/QuotaUtils.scala 
b/core/src/main/scala/kafka/utils/QuotaUtils.scala
deleted file mode 100755
index 93d5cdebcc3..00000000000
--- a/core/src/main/scala/kafka/utils/QuotaUtils.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.{KafkaMetric, Measurable, 
QuotaViolationException}
-import org.apache.kafka.common.metrics.stats.Rate
-
-/**
- * Helper functions related to quotas
- */
-object QuotaUtils {
-
-  /**
-   * This calculates the amount of time needed to bring the observed rate 
within quota
-   * assuming that no new metrics are recorded.
-   *
-   * If O is the observed rate and T is the target rate over a window of W, to 
bring O down to T,
-   * we need to add a delay of X to W such that O * W / (W + X) = T.
-   * Solving for X, we get X = (O - T)/T * W.
-   *
-   * @param timeMs current time in milliseconds
-   * @return Delay in milliseconds
-   */
-  def throttleTime(e: QuotaViolationException, timeMs: Long): Long = {
-    val difference = e.value - e.bound
-    // Use the precise window used by the rate calculation
-    val throttleTimeMs = difference / e.bound * windowSize(e.metric, timeMs)
-    Math.round(throttleTimeMs)
-  }
-
-  /**
-   * Calculates the amount of time needed to bring the observed rate within 
quota using the same algorithm as
-   * throttleTime() utility method but the returned value is capped to given 
maxThrottleTime
-   */
-  def boundedThrottleTime(e: QuotaViolationException, maxThrottleTime: Long, 
timeMs: Long): Long = {
-    math.min(throttleTime(e, timeMs), maxThrottleTime)
-  }
-
-  /**
-   * Returns window size of the given metric
-   *
-   * @param metric metric with measurable of type Rate
-   * @param timeMs current time in milliseconds
-   * @throws IllegalArgumentException if given measurable is not Rate
-   */
-  private def windowSize(metric: KafkaMetric, timeMs: Long): Long =
-    measurableAsRate(metric.metricName, 
metric.measurable).windowSize(metric.config, timeMs)
-
-  /**
-   * Casts provided Measurable to Rate
-   * @throws IllegalArgumentException if given measurable is not Rate
-   */
-  private def measurableAsRate(name: MetricName, measurable: Measurable): Rate 
= {
-    measurable match {
-      case r: Rate => r
-      case _ => throw new IllegalArgumentException(s"Metric $name is not a 
Rate metric, value $measurable")
-    }
-  }
-}
diff --git a/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java 
b/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java
index ce6c3ce3ecc..53236e78d44 100644
--- a/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java
@@ -16,14 +16,13 @@
  */
 package kafka.log.remote.quota;
 
-import kafka.server.QuotaType;
-
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Quota;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.server.quota.QuotaType;
 
 import org.junit.jupiter.api.Test;
 
@@ -39,7 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class RLMQuotaManagerTest {
     private final MockTime time = new MockTime();
     private final Metrics metrics = new Metrics(new MetricConfig(), 
Collections.emptyList(), time);
-    private static final QuotaType QUOTA_TYPE = QuotaType.RLMFetch$.MODULE$;
+    private static final QuotaType QUOTA_TYPE = QuotaType.RLM_FETCH;
     private static final String DESCRIPTION = "Tracking byte rate";
 
     @Test
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index e27fa687f11..5f9f9aac361 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit
 import java.util.{Collections, Properties}
 import com.yammer.metrics.core.{Histogram, Meter}
 import kafka.api.QuotaTestClients._
-import kafka.server.{ClientQuotaManager, KafkaBroker, QuotaType}
+import kafka.server.{ClientQuotaManager, KafkaBroker}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.Admin
 import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
@@ -33,8 +33,9 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration
 import org.apache.kafka.common.quota.ClientQuotaEntity
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.server.config.{ServerConfigs, QuotaConfigs}
+import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.server.quota.QuotaType
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
@@ -178,7 +179,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness 
{
     while ((!throttled || quotaTestClients.exemptRequestMetric == null || 
metricValue(quotaTestClients.exemptRequestMetric) <= 0)
       && System.currentTimeMillis < endTimeMs) {
       consumer.poll(Duration.ofMillis(100L))
-      val throttleMetric = quotaTestClients.throttleMetric(QuotaType.Request, 
consumerClientId)
+      val throttleMetric = quotaTestClients.throttleMetric(QuotaType.REQUEST, 
consumerClientId)
       throttled = throttleMetric != null && metricValue(throttleMetric) > 0
     }
 
@@ -221,7 +222,7 @@ abstract class QuotaTestClients(topic: String,
         new ErrorLoggingCallback(topic, null, null, true))
       numProduced += 1
       do {
-        val metric = throttleMetric(QuotaType.Produce, producerClientId)
+        val metric = throttleMetric(QuotaType.PRODUCE, producerClientId)
         throttled = metric != null && metricValue(metric) > 0
       } while (!future.isDone && (!throttled || waitForRequestCompletion))
     } while (numProduced < maxRecords && !throttled)
@@ -237,7 +238,7 @@ abstract class QuotaTestClients(topic: String,
     val startMs = System.currentTimeMillis
     do {
       numConsumed += consumer.poll(Duration.ofMillis(100L)).count
-      val metric = throttleMetric(QuotaType.Fetch, consumerClientId)
+      val metric = throttleMetric(QuotaType.FETCH, consumerClientId)
       throttled = metric != null && metricValue(metric) > 0
     } while (numConsumed < maxRecords && !throttled && 
System.currentTimeMillis < startMs + timeoutMs)
 
@@ -266,7 +267,7 @@ abstract class QuotaTestClients(topic: String,
 
   def verifyProduceThrottle(expectThrottle: Boolean, verifyClientMetric: 
Boolean = true,
                             verifyRequestChannelMetric: Boolean = true): Unit 
= {
-    verifyThrottleTimeMetric(QuotaType.Produce, producerClientId, 
expectThrottle)
+    verifyThrottleTimeMetric(QuotaType.PRODUCE, producerClientId, 
expectThrottle)
     if (verifyRequestChannelMetric)
       verifyThrottleTimeRequestChannelMetric(ApiKeys.PRODUCE, "", 
producerClientId, expectThrottle)
     if (verifyClientMetric)
@@ -275,7 +276,7 @@ abstract class QuotaTestClients(topic: String,
 
   def verifyConsumeThrottle(expectThrottle: Boolean, verifyClientMetric: 
Boolean = true,
                             verifyRequestChannelMetric: Boolean = true): Unit 
= {
-    verifyThrottleTimeMetric(QuotaType.Fetch, consumerClientId, expectThrottle)
+    verifyThrottleTimeMetric(QuotaType.FETCH, consumerClientId, expectThrottle)
     if (verifyRequestChannelMetric)
       verifyThrottleTimeRequestChannelMetric(ApiKeys.FETCH, "Consumer", 
consumerClientId, expectThrottle)
     if (verifyClientMetric)
@@ -318,7 +319,7 @@ abstract class QuotaTestClients(topic: String,
   }
 
   def exemptRequestMetric: KafkaMetric = {
-    val metricName = leaderNode.metrics.metricName("exempt-request-time", 
QuotaType.Request.toString, "")
+    val metricName = leaderNode.metrics.metricName("exempt-request-time", 
QuotaType.REQUEST.toString, "")
     leaderNode.metrics.metrics.get(metricName)
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala 
b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 7403f8e6e05..7f1eaa6c300 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -294,10 +294,10 @@ class CustomQuotaCallbackTest extends 
IntegrationTestHarness with SaslSetup {
         
leaderNode.metrics.removeSensor(s"${quotaType}ThrottleTime-$sensorSuffix")
         leaderNode.metrics.removeSensor(s"$quotaType-$sensorSuffix")
       }
-      removeSensors(QuotaType.Produce, producerClientId)
-      removeSensors(QuotaType.Fetch, consumerClientId)
-      removeSensors(QuotaType.Request, producerClientId)
-      removeSensors(QuotaType.Request, consumerClientId)
+      removeSensors(QuotaType.PRODUCE, producerClientId)
+      removeSensors(QuotaType.FETCH, consumerClientId)
+      removeSensors(QuotaType.REQUEST, producerClientId)
+      removeSensors(QuotaType.REQUEST, consumerClientId)
     }
 
     private def quotaEntityName(userGroup: String): String = s"${userGroup}_"
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 8a00feacb0b..638e8d232d1 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -16,7 +16,7 @@ import java.time.Duration
 import java.util
 import java.util.Arrays.asList
 import java.util.{Collections, Locale, Optional, Properties}
-import kafka.server.{KafkaBroker, QuotaType}
+import kafka.server.KafkaBroker
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}
 import org.apache.kafka.clients.consumer._
@@ -27,6 +27,7 @@ import org.apache.kafka.common.header.Headers
 import org.apache.kafka.common.record.{CompressionType, TimestampType}
 import org.apache.kafka.common.serialization._
 import org.apache.kafka.common.{MetricName, TopicPartition}
+import org.apache.kafka.server.quota.QuotaType
 import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Timeout
@@ -629,15 +630,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
                                   "client-id", clientId)
         assertNull(broker.metrics.metric(metricName), "Metric should not have 
been created " + metricName)
     }
-    brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, 
producerClientId))
-    brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, 
producerClientId))
-    brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, 
consumerClientId))
-    brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, 
consumerClientId))
-
-    brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
producerClientId))
-    brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
producerClientId))
-    brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
consumerClientId))
-    brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
consumerClientId))
+    brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.PRODUCE, 
producerClientId))
+    brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.PRODUCE, 
producerClientId))
+    brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.FETCH, 
consumerClientId))
+    brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.FETCH, 
consumerClientId))
+
+    brokers.foreach(assertNoMetric(_, "request-time", QuotaType.REQUEST, 
producerClientId))
+    brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.REQUEST, 
producerClientId))
+    brokers.foreach(assertNoMetric(_, "request-time", QuotaType.REQUEST, 
consumerClientId))
+    brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.REQUEST, 
consumerClientId))
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 59e206304a3..4a4107226dc 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -17,12 +17,12 @@
 package kafka.server
 
 import java.net.InetAddress
-import kafka.server.QuotaType._
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.server.config.{ClientQuotaManagerConfig, 
ZooKeeperInternals}
 import org.apache.kafka.network.Session
+import org.apache.kafka.server.quota.QuotaType
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
@@ -30,7 +30,7 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
   private val config = new ClientQuotaManagerConfig()
 
   private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: 
UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: 
UserClient): Unit = {
-    val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, 
time, "")
+    val clientQuotaManager = new ClientQuotaManager(config, metrics, 
QuotaType.PRODUCE, time, "")
 
     try {
       // Case 1: Update the quota. Assert that the new quota value is returned
@@ -160,7 +160,7 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
   def testGetMaxValueInQuotaWindowWithNonDefaultQuotaWindow(): Unit = {
     val numFullQuotaWindows = 3   // 3 seconds window (vs. 10 seconds default)
     val nonDefaultConfig = new ClientQuotaManagerConfig(numFullQuotaWindows + 
1)
-    val clientQuotaManager = new ClientQuotaManager(nonDefaultConfig, metrics, 
Fetch, time, "")
+    val clientQuotaManager = new ClientQuotaManager(nonDefaultConfig, metrics, 
QuotaType.FETCH, time, "")
     val userSession = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"userA"), InetAddress.getLocalHost)
 
     try {
@@ -179,7 +179,7 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
   def testSetAndRemoveDefaultUserQuota(): Unit = {
     // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
     val clientQuotaManager = new ClientQuotaManager(new 
ClientQuotaManagerConfig(),
-      metrics, Produce, time, "")
+      metrics, QuotaType.PRODUCE, time, "")
 
     try {
       // no quota set yet, should not throttle
@@ -201,7 +201,7 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
   def testSetAndRemoveUserQuota(): Unit = {
     // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
     val clientQuotaManager = new ClientQuotaManager(new 
ClientQuotaManagerConfig(),
-      metrics, Produce, time, "")
+      metrics, QuotaType.PRODUCE, time, "")
 
     try {
       // Set <user> quota config
@@ -220,7 +220,7 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
   def testSetAndRemoveUserClientQuota(): Unit = {
     // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
     val clientQuotaManager = new ClientQuotaManager(new 
ClientQuotaManagerConfig(),
-      metrics, Produce, time, "")
+      metrics, QuotaType.PRODUCE, time, "")
 
     try {
       // Set <user, client-id> quota config
@@ -238,7 +238,7 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
   @Test
   def testQuotaConfigPrecedence(): Unit = {
     val clientQuotaManager = new ClientQuotaManager(new 
ClientQuotaManagerConfig(),
-      metrics, Produce, time, "")
+      metrics, QuotaType.PRODUCE, time, "")
 
     try {
       clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), 
None, None, Some(new Quota(1000, true)))
@@ -301,7 +301,7 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
 
   @Test
   def testQuotaViolation(): Unit = {
-    val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, 
time, "")
+    val clientQuotaManager = new ClientQuotaManager(config, metrics, 
QuotaType.PRODUCE, time, "")
     val queueSizeMetric = 
metrics.metrics().get(metrics.metricName("queue-size", "Produce", ""))
     try {
       clientQuotaManager.updateQuota(None, 
Some(ZooKeeperInternals.DEFAULT_STRING), 
Some(ZooKeeperInternals.DEFAULT_STRING),
@@ -350,7 +350,7 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
 
   @Test
   def testExpireThrottleTimeSensor(): Unit = {
-    val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, 
time, "")
+    val clientQuotaManager = new ClientQuotaManager(config, metrics, 
QuotaType.PRODUCE, time, "")
     try {
       clientQuotaManager.updateQuota(None, 
Some(ZooKeeperInternals.DEFAULT_STRING), 
Some(ZooKeeperInternals.DEFAULT_STRING),
         Some(new Quota(500, true)))
@@ -372,7 +372,7 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
 
   @Test
   def testExpireQuotaSensors(): Unit = {
-    val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, 
time, "")
+    val clientQuotaManager = new ClientQuotaManager(config, metrics, 
QuotaType.PRODUCE, time, "")
     try {
       clientQuotaManager.updateQuota(None, 
Some(ZooKeeperInternals.DEFAULT_STRING), 
Some(ZooKeeperInternals.DEFAULT_STRING),
         Some(new Quota(500, true)))
@@ -398,7 +398,7 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
 
   @Test
   def testClientIdNotSanitized(): Unit = {
-    val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, 
time, "")
+    val clientQuotaManager = new ClientQuotaManager(config, metrics, 
QuotaType.PRODUCE, time, "")
     val clientId = "client@#$%"
     try {
       clientQuotaManager.updateQuota(None, 
Some(ZooKeeperInternals.DEFAULT_STRING), 
Some(ZooKeeperInternals.DEFAULT_STRING),
diff --git 
a/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
index 31a3e6e753d..ec09f3a7d6d 100644
--- a/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
@@ -16,9 +16,9 @@
  */
 package kafka.server
 
-import kafka.server.QuotaType.Request
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.server.config.ClientQuotaManagerConfig
+import org.apache.kafka.server.quota.QuotaType
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
@@ -29,7 +29,7 @@ class ClientRequestQuotaManagerTest extends 
BaseClientQuotaManagerTest {
   def testRequestPercentageQuotaViolation(): Unit = {
     val clientRequestQuotaManager = new ClientRequestQuotaManager(config, 
metrics, time, "", None)
     clientRequestQuotaManager.updateQuota(Some("ANONYMOUS"), 
Some("test-client"), Some("test-client"), Some(Quota.upperBound(1)))
-    val queueSizeMetric = 
metrics.metrics().get(metrics.metricName("queue-size", Request.toString, ""))
+    val queueSizeMetric = 
metrics.metrics().get(metrics.metricName("queue-size", 
QuotaType.REQUEST.toString, ""))
     def millisToPercent(millis: Double) = millis * 1000 * 1000 * 
ClientRequestQuotaManager.NanosToPercentagePerSecond
     try {
       // We have 10 second windows. Make sure that there is no quota violation
diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaManagerTest.scala
 
b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaManagerTest.scala
index dea6d554f1a..8fa4a290cc7 100644
--- 
a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaManagerTest.scala
@@ -17,7 +17,6 @@
 package kafka.server
 
 import java.util.concurrent.TimeUnit
-import kafka.server.QuotaType.ControllerMutation
 import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
 import org.apache.kafka.common.metrics.MetricConfig
 import org.apache.kafka.common.metrics.Metrics
@@ -26,6 +25,7 @@ import org.apache.kafka.common.metrics.QuotaViolationException
 import org.apache.kafka.common.metrics.stats.TokenBucket
 import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.server.config.ClientQuotaManagerConfig
+import org.apache.kafka.server.quota.QuotaType
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Assertions.assertFalse
@@ -148,7 +148,7 @@ class ControllerMutationQuotaManagerTest extends 
BaseClientQuotaManagerTest {
       quotaManager.updateQuota(Some(User), Some(ClientId), Some(ClientId),
         Some(Quota.upperBound(10)))
       val queueSizeMetric = metrics.metrics().get(
-        metrics.metricName("queue-size", ControllerMutation.toString, ""))
+        metrics.metricName("queue-size", 
QuotaType.CONTROLLER_MUTATION.toString, ""))
 
       // Verify that there is no quota violation if we remain under the quota.
       for (_ <- 0 until 10) {
diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
index 7886b851b97..e0f8be559f7 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
@@ -43,6 +43,7 @@ import 
org.apache.kafka.common.security.auth.AuthenticationContext
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs}
+import org.apache.kafka.server.quota.QuotaType
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions.assertEquals
@@ -406,7 +407,7 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
       else brokers.head.metrics
     val metricName = metrics.metricName(
       "tokens",
-      QuotaType.ControllerMutation.toString,
+      QuotaType.CONTROLLER_MUTATION.toString,
       "Tracking remaining tokens in the token bucket per user/client-id",
       Map(DefaultTags.User -> user, DefaultTags.ClientId -> "").asJava)
     Option(metrics.metric(metricName))
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
index 7f2346d36a0..ff90883a4a4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
@@ -17,11 +17,11 @@
 package kafka.server
 
 import java.util.Collections
-import kafka.server.QuotaType._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
 import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.server.config.ReplicationQuotaManagerConfig
+import org.apache.kafka.server.quota.QuotaType
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.{AfterEach, Test}
 
@@ -38,7 +38,7 @@ class ReplicationQuotaManagerTest {
 
   @Test
   def shouldThrottleOnlyDefinedReplicas(): Unit = {
-    val quota = new ReplicationQuotaManager(new 
ReplicationQuotaManagerConfig(), metrics, QuotaType.Fetch, time)
+    val quota = new ReplicationQuotaManager(new 
ReplicationQuotaManagerConfig(), metrics, QuotaType.FETCH, time)
     quota.markThrottled("topic1", Seq(1, 2, 3))
 
     assertTrue(quota.isThrottled(tp1(1)))
@@ -49,7 +49,7 @@ class ReplicationQuotaManagerTest {
 
   @Test
   def shouldExceedQuotaThenReturnBackBelowBoundAsTimePasses(): Unit = {
-    val quota = new ReplicationQuotaManager(new 
ReplicationQuotaManagerConfig(10, 1), metrics, LeaderReplication, time)
+    val quota = new ReplicationQuotaManager(new 
ReplicationQuotaManagerConfig(10, 1), metrics, QuotaType.LEADER_REPLICATION, 
time)
 
     //Given
     quota.updateQuota(new Quota(100, true))
@@ -103,14 +103,14 @@ class ReplicationQuotaManagerTest {
   }
 
   def rate(metrics: Metrics): Double = {
-    val metricName = metrics.metricName("byte-rate", 
LeaderReplication.toString, "Tracking byte-rate for " + LeaderReplication)
+    val metricName = metrics.metricName("byte-rate", 
QuotaType.LEADER_REPLICATION.toString, "Tracking byte-rate for " + 
QuotaType.LEADER_REPLICATION)
     val leaderThrottledRate = 
metrics.metrics.asScala(metricName).metricValue.asInstanceOf[Double]
     leaderThrottledRate
   }
 
   @Test
   def shouldSupportWildcardThrottledReplicas(): Unit = {
-    val quota = new ReplicationQuotaManager(new 
ReplicationQuotaManagerConfig(), metrics, LeaderReplication, time)
+    val quota = new ReplicationQuotaManager(new 
ReplicationQuotaManagerConfig(), metrics, QuotaType.LEADER_REPLICATION, time)
 
     //When
     quota.markThrottled("MyTopic")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index c9984c24baa..2dad987bc61 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -21,7 +21,6 @@ import java.util.AbstractMap.SimpleImmutableEntry
 import java.util.{Collections, Properties}
 import java.util.Map.Entry
 import kafka.server.KafkaConfig.fromProps
-import kafka.server.QuotaType._
 import kafka.utils.TestUtils._
 import kafka.utils.CoreUtils._
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
@@ -37,6 +36,7 @@ import 
org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT
 import org.apache.kafka.controller.ControllerRequestContextUtil
 import org.apache.kafka.server.common.{Features, MetadataVersion}
 import org.apache.kafka.server.config.QuotaConfigs
+import org.apache.kafka.server.quota.QuotaType
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.AfterEach
 import org.junit.jupiter.params.ParameterizedTest
@@ -198,7 +198,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
     // In a short test the brokers can be read unfairly, so assert against the 
average
     val rateUpperBound = throttle * 1.1
     val rateLowerBound = throttle * 0.5
-    val rate = if (leaderThrottle) avRate(LeaderReplication, 100 to 105) else 
avRate(FollowerReplication, 106 to 107)
+    val rate = if (leaderThrottle) avRate(QuotaType.LEADER_REPLICATION, 100 to 
105) else avRate(QuotaType.FOLLOWER_REPLICATION, 106 to 107)
     assertTrue(rate < rateUpperBound, s"Expected $rate < $rateUpperBound")
     assertTrue(rate > rateLowerBound, s"Expected $rate > $rateLowerBound")
   }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index a98b7cb2ded..8be1a9787f5 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -47,6 +47,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer
 import org.apache.kafka.network.Session
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult}
 import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs}
+import org.apache.kafka.server.quota.QuotaType
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
@@ -229,7 +230,7 @@ class RequestQuotaTest extends BaseRequestTest {
   def session(user: String): Session = new Session(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null)
 
   private def throttleTimeMetricValue(clientId: String): Double = {
-    throttleTimeMetricValueForQuotaType(clientId, QuotaType.Request)
+    throttleTimeMetricValueForQuotaType(clientId, QuotaType.REQUEST)
   }
 
   private def throttleTimeMetricValueForQuotaType(clientId: String, quotaType: 
QuotaType): Double = {
@@ -241,7 +242,7 @@ class RequestQuotaTest extends BaseRequestTest {
   }
 
   private def requestTimeMetricValue(clientId: String): Double = {
-    val metricName = leaderNode.metrics.metricName("request-time", 
QuotaType.Request.toString,
+    val metricName = leaderNode.metrics.metricName("request-time", 
QuotaType.REQUEST.toString,
       "", "user", "", "client-id", clientId)
     val sensor = 
leaderNode.quotaManagers.request.getOrCreateQuotaSensors(session("ANONYMOUS"),
       clientId).quotaSensor
@@ -249,7 +250,7 @@ class RequestQuotaTest extends BaseRequestTest {
   }
 
   private def exemptRequestMetricValue: Double = {
-    val metricName = leaderNode.metrics.metricName("exempt-request-time", 
QuotaType.Request.toString, "")
+    val metricName = leaderNode.metrics.metricName("exempt-request-time", 
QuotaType.REQUEST.toString, "")
     metricValue(leaderNode.metrics.metrics.get(metricName), 
leaderNode.quotaManagers.request.exemptSensor)
   }
 
@@ -781,8 +782,8 @@ class RequestQuotaTest extends BaseRequestTest {
     override def toString: String = {
       val requestTime = requestTimeMetricValue(clientId)
       val throttleTime = throttleTimeMetricValue(clientId)
-      val produceThrottleTime = throttleTimeMetricValueForQuotaType(clientId, 
QuotaType.Produce)
-      val consumeThrottleTime = throttleTimeMetricValueForQuotaType(clientId, 
QuotaType.Fetch)
+      val produceThrottleTime = throttleTimeMetricValueForQuotaType(clientId, 
QuotaType.PRODUCE)
+      val consumeThrottleTime = throttleTimeMetricValueForQuotaType(clientId, 
QuotaType.FETCH)
       s"Client $clientId apiKey $apiKey requests $correlationId requestTime 
$requestTime " +
       s"throttleTime $throttleTime produceThrottleTime $produceThrottleTime 
consumeThrottleTime $consumeThrottleTime"
     }
@@ -826,9 +827,9 @@ class RequestQuotaTest extends BaseRequestTest {
     val throttled = smallQuotaProducerClient.runUntil(_.throttleTimeMs > 0)
 
     assertTrue(throttled, s"Response not throttled: $smallQuotaProducerClient")
-    assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaProducerClientId, 
QuotaType.Produce) > 0,
+    assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaProducerClientId, 
QuotaType.PRODUCE) > 0,
       s"Throttle time metrics for produce quota not updated: 
$smallQuotaProducerClient")
-    assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaProducerClientId, 
QuotaType.Request).isNaN,
+    assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaProducerClientId, 
QuotaType.REQUEST).isNaN,
       s"Throttle time metrics for request quota updated: 
$smallQuotaProducerClient")
   }
 
@@ -839,9 +840,9 @@ class RequestQuotaTest extends BaseRequestTest {
     val throttled = smallQuotaConsumerClient.runUntil(_.throttleTimeMs > 0)
 
     assertTrue(throttled, s"Response not throttled: 
$smallQuotaConsumerClientId")
-    assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId, 
QuotaType.Fetch) > 0,
+    assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId, 
QuotaType.FETCH) > 0,
       s"Throttle time metrics for consumer quota not updated: 
$smallQuotaConsumerClient")
-    assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId, 
QuotaType.Request).isNaN,
+    assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId, 
QuotaType.REQUEST).isNaN,
       s"Throttle time metrics for request quota updated: 
$smallQuotaConsumerClient")
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala 
b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
index 06def9021ce..6d8adef1f8b 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.DelayQueue
 import org.apache.kafka.common.metrics.MetricConfig
 import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.server.config.ClientQuotaManagerConfig
-import org.apache.kafka.server.quota.{ThrottleCallback, ThrottledChannel}
+import org.apache.kafka.server.quota.{QuotaType, ThrottleCallback, 
ThrottledChannel}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, Test}
 
@@ -52,7 +52,7 @@ class ThrottledChannelExpirationTest {
 
   @Test
   def testCallbackInvocationAfterExpiration(): Unit = {
-    val clientMetrics = new ClientQuotaManager(new ClientQuotaManagerConfig(), 
metrics, QuotaType.Produce, time, "")
+    val clientMetrics = new ClientQuotaManager(new ClientQuotaManagerConfig(), 
metrics, QuotaType.PRODUCE, time, "")
 
     val delayQueue = new DelayQueue[ThrottledChannel]()
     val reaper = new clientMetrics.ThrottledChannelReaper(delayQueue, "")
diff --git a/core/src/test/scala/unit/kafka/utils/QuotaUtilsTest.scala 
b/core/src/test/scala/unit/kafka/utils/QuotaUtilsTest.scala
deleted file mode 100755
index 8d1741aff81..00000000000
--- a/core/src/test/scala/unit/kafka/utils/QuotaUtilsTest.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.concurrent.TimeUnit
-import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.{KafkaMetric, MetricConfig, Quota, 
QuotaViolationException}
-import org.apache.kafka.common.metrics.stats.{Rate, Value}
-import org.apache.kafka.server.util.MockTime
-
-import scala.jdk.CollectionConverters._
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-class QuotaUtilsTest {
-
-  private val time = new MockTime
-  private val numSamples = 10
-  private val sampleWindowSec = 1
-  private val maxThrottleTimeMs = 500
-  private val metricName = new MetricName("test-metric", "groupA", "testA", 
Map.empty.asJava)
-
-  @Test
-  def testThrottleTimeObservedRateEqualsQuota(): Unit = {
-    val numSamples = 10
-    val observedValue = 16.5
-
-    assertEquals(0, throttleTime(observedValue, observedValue, numSamples))
-
-    // should be independent of window size
-    assertEquals(0, throttleTime(observedValue, observedValue, numSamples + 1))
-  }
-
-  @Test
-  def testThrottleTimeObservedRateBelowQuota(): Unit = {
-    val observedValue = 16.5
-    val quota = 20.4
-    assertTrue(throttleTime(observedValue, quota, numSamples) < 0)
-
-    // should be independent of window size
-    assertTrue(throttleTime(observedValue, quota, numSamples + 1) < 0)
-  }
-
-  @Test
-  def testThrottleTimeObservedRateAboveQuota(): Unit = {
-    val quota = 50.0
-    val observedValue = 100.0
-    assertEquals(2000, throttleTime(observedValue, quota, 3))
-  }
-
-  @Test
-  def testBoundedThrottleTimeObservedRateEqualsQuota(): Unit = {
-    val observedValue = 18.2
-    assertEquals(0, boundedThrottleTime(observedValue, observedValue, 
numSamples, maxThrottleTimeMs))
-
-    // should be independent of window size
-    assertEquals(0, boundedThrottleTime(observedValue, observedValue, 
numSamples + 1, maxThrottleTimeMs))
-  }
-
-  @Test
-  def testBoundedThrottleTimeObservedRateBelowQuota(): Unit = {
-    val observedValue = 16.5
-    val quota = 22.4
-
-    assertTrue(boundedThrottleTime(observedValue, quota, numSamples, 
maxThrottleTimeMs) < 0)
-
-    // should be independent of window size
-    assertTrue(boundedThrottleTime(observedValue, quota, numSamples + 1, 
maxThrottleTimeMs) < 0)
-  }
-
-  @Test
-  def testBoundedThrottleTimeObservedRateAboveQuotaBelowLimit(): Unit = {
-    val quota = 50.0
-    val observedValue = 55.0
-    assertEquals(100, boundedThrottleTime(observedValue, quota, 2, 
maxThrottleTimeMs))
-  }
-
-  @Test
-  def testBoundedThrottleTimeObservedRateAboveQuotaAboveLimit(): Unit = {
-    val quota = 50.0
-    val observedValue = 100.0
-    assertEquals(maxThrottleTimeMs, boundedThrottleTime(observedValue, quota, 
numSamples, maxThrottleTimeMs))
-  }
-
-  @Test
-  def testThrottleTimeThrowsExceptionIfProvidedNonRateMetric(): Unit = {
-    val testMetric = new KafkaMetric(new Object(), metricName, new Value(), 
new MetricConfig, time)
-
-    assertThrows(classOf[IllegalArgumentException], () => 
QuotaUtils.throttleTime(new QuotaViolationException(testMetric, 10.0, 20.0), 
time.milliseconds))
-  }
-
-  @Test
-  def testBoundedThrottleTimeThrowsExceptionIfProvidedNonRateMetric(): Unit = {
-    val testMetric = new KafkaMetric(new Object(), metricName, new Value(), 
new MetricConfig, time)
-
-    assertThrows(classOf[IllegalArgumentException], () => 
QuotaUtils.boundedThrottleTime(new QuotaViolationException(testMetric, 10.0, 
20.0),
-      maxThrottleTimeMs, time.milliseconds))
-  }
-
-  // the `metric` passed into the returned QuotaViolationException will return 
windowSize = 'numSamples' - 1
-  private def quotaViolationException(observedValue: Double, quota: Double, 
numSamples: Int): QuotaViolationException = {
-    val metricConfig = new MetricConfig()
-      .timeWindow(sampleWindowSec, TimeUnit.SECONDS)
-      .samples(numSamples)
-      .quota(new Quota(quota, true))
-    val metric = new KafkaMetric(new Object(), metricName, new Rate(), 
metricConfig, time)
-    new QuotaViolationException(metric, observedValue, quota)
-  }
-
-  private def throttleTime(observedValue: Double, quota: Double, numSamples: 
Int): Long = {
-    val e = quotaViolationException(observedValue, quota, numSamples)
-    QuotaUtils.throttleTime(e, time.milliseconds)
-  }
-
-  private def boundedThrottleTime(observedValue: Double, quota: Double, 
numSamples: Int, maxThrottleTime: Long): Long = {
-    val e = quotaViolationException(observedValue, quota, numSamples)
-    QuotaUtils.boundedThrottleTime(e, maxThrottleTime, time.milliseconds)
-  }
-}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/quota/QuotaType.java 
b/server-common/src/main/java/org/apache/kafka/server/quota/QuotaType.java
new file mode 100644
index 00000000000..e1454d6ab01
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/quota/QuotaType.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+public enum QuotaType {
+    FETCH("Fetch"),
+    PRODUCE("Produce"),
+    REQUEST("Request"),
+    CONTROLLER_MUTATION("ControllerMutation"),
+    LEADER_REPLICATION("LeaderReplication"),
+    FOLLOWER_REPLICATION("FollowerReplication"),
+    ALTER_LOG_DIRS_REPLICATION("AlterLogDirsReplication"),
+    RLM_COPY("RLMCopy"),
+    RLM_FETCH("RLMFetch");
+
+    private final String name;
+
+    QuotaType(String name) {
+        this.name = name;
+    }
+
+    public static ClientQuotaType toClientQuotaType(QuotaType quotaType) {
+        switch (quotaType) {
+            case FETCH: return ClientQuotaType.FETCH;
+            case PRODUCE: return ClientQuotaType.PRODUCE;
+            case REQUEST: return ClientQuotaType.REQUEST;
+            case CONTROLLER_MUTATION: return 
ClientQuotaType.CONTROLLER_MUTATION;
+            default: throw new IllegalArgumentException("Not a client quota 
type: " + quotaType);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/quota/QuotaUtils.java 
b/server-common/src/main/java/org/apache/kafka/server/quota/QuotaUtils.java
new file mode 100644
index 00000000000..4b9e7401af0
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/quota/QuotaUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.stats.Rate;
+
+/**
+ * Helper functions related to quotas
+ */
+public class QuotaUtils {
+
+    /**
+     * This calculates the amount of time needed to bring the observed rate 
within quota
+     * assuming that no new metrics are recorded.
+     * <br/>
+     * If O is the observed rate and T is the target rate over a window of W, 
to bring O down to T,
+     * we need to add a delay of X to W such that O * W / (W + X) = T.
+     * Solving for X, we get X = (O - T)/T * W.
+     *
+     * @param timeMs current time in milliseconds
+     * @return Delay in milliseconds
+     */
+    public static long throttleTime(QuotaViolationException e, long timeMs) {
+        double difference = e.value() - e.bound();
+        // Use the precise window used by the rate calculation
+        double throttleTimeMs = difference / e.bound() * 
windowSize(e.metric(), timeMs);
+        return Math.round(throttleTimeMs);
+    }
+
+    /**
+     * Calculates the amount of time needed to bring the observed rate within 
quota using the same algorithm as
+     * throttleTime() utility method but the returned value is capped to given 
maxThrottleTime
+     */
+    public static long boundedThrottleTime(QuotaViolationException e, long 
maxThrottleTime, long timeMs) {
+        return Math.min(throttleTime(e, timeMs), maxThrottleTime);
+    }
+
+    /**
+     * Returns window size of the given metric
+     *
+     * @param metric metric with measurable of type Rate
+     * @param timeMs current time in milliseconds
+     * @throws IllegalArgumentException if given measurable is not Rate
+     */
+    private static long windowSize(KafkaMetric metric, long timeMs) {
+        return measurableAsRate(metric.metricName(), 
metric.measurable()).windowSize(metric.config(), timeMs);
+    }
+
+    /**
+     * Casts provided Measurable to Rate
+     * @throws IllegalArgumentException if given measurable is not Rate
+     */
+    private static Rate measurableAsRate(MetricName name, Measurable 
measurable) {
+        if (measurable instanceof Rate) {
+            return (Rate) measurable;
+        } else {
+            throw new IllegalArgumentException("Metric " + name + " is not a 
Rate metric, value " + measurable);
+        }
+    }
+
+}
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/quota/QuotaUtilsTest.java 
b/server-common/src/test/java/org/apache/kafka/server/quota/QuotaUtilsTest.java
new file mode 100644
index 00000000000..d0c1c699cbe
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/server/quota/QuotaUtilsTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class QuotaUtilsTest {
+
+    private final Time time = new MockTime();
+    private final int numSamples = 10;
+    private final int maxThrottleTimeMs = 500;
+    private final MetricName metricName = new MetricName("test-metric", 
"groupA", "testA", Collections.emptyMap());
+
+    @Test
+    public void testThrottleTimeObservedRateEqualsQuota() {
+        int numSamples = 10;
+        double observedValue = 16.5;
+
+        assertEquals(0, throttleTime(observedValue, observedValue, 
numSamples));
+
+        // should be independent of window size
+        assertEquals(0, throttleTime(observedValue, observedValue, numSamples 
+ 1));
+    }
+
+    @Test
+    public void testThrottleTimeObservedRateBelowQuota() {
+        double observedValue = 16.5;
+        double quota = 20.4;
+        assertTrue(throttleTime(observedValue, quota, numSamples) < 0);
+
+        // should be independent of window size
+        assertTrue(throttleTime(observedValue, quota, numSamples + 1) < 0);
+    }
+
+    @Test
+    public void testThrottleTimeObservedRateAboveQuota() {
+        double quota = 50.0;
+        double observedValue = 100.0;
+        assertEquals(2000, throttleTime(observedValue, quota, 3));
+    }
+
+    @Test
+    public void testBoundedThrottleTimeObservedRateEqualsQuota() {
+        double observedValue = 18.2;
+        assertEquals(0, boundedThrottleTime(observedValue, observedValue, 
numSamples, maxThrottleTimeMs));
+
+        // should be independent of window size
+        assertEquals(0, boundedThrottleTime(observedValue, observedValue, 
numSamples + 1, maxThrottleTimeMs));
+    }
+
+    @Test
+    public void testBoundedThrottleTimeObservedRateBelowQuota() {
+        double observedValue = 16.5;
+        double quota = 22.4;
+
+        assertTrue(boundedThrottleTime(observedValue, quota, numSamples, 
maxThrottleTimeMs) < 0);
+
+        // should be independent of window size
+        assertTrue(boundedThrottleTime(observedValue, quota, numSamples + 1, 
maxThrottleTimeMs) < 0);
+    }
+
+    @Test
+    public void testBoundedThrottleTimeObservedRateAboveQuotaBelowLimit() {
+        double quota = 50.0;
+        double observedValue = 55.0;
+        assertEquals(100, boundedThrottleTime(observedValue, quota, 2, 
maxThrottleTimeMs));
+    }
+
+    @Test
+    public void testBoundedThrottleTimeObservedRateAboveQuotaAboveLimit() {
+        double quota = 50.0;
+        double observedValue = 100.0;
+        assertEquals(maxThrottleTimeMs, boundedThrottleTime(observedValue, 
quota, numSamples, maxThrottleTimeMs));
+    }
+
+    @Test
+    public void testThrottleTimeThrowsExceptionIfProvidedNonRateMetric() {
+        KafkaMetric testMetric = new KafkaMetric(new Object(), metricName, new 
Value(), new MetricConfig(), time);
+
+        assertThrows(IllegalArgumentException.class,
+                () -> QuotaUtils.throttleTime(new 
QuotaViolationException(testMetric, 10.0, 20.0), time.milliseconds()));
+    }
+
+    @Test
+    public void 
testBoundedThrottleTimeThrowsExceptionIfProvidedNonRateMetric() {
+        KafkaMetric testMetric = new KafkaMetric(new Object(), metricName, new 
Value(), new MetricConfig(), time);
+
+        assertThrows(IllegalArgumentException.class,
+                () -> QuotaUtils.boundedThrottleTime(new 
QuotaViolationException(testMetric, 10.0, 20.0), maxThrottleTimeMs, 
time.milliseconds()));
+    }
+
+    // the `metric` passed into the returned QuotaViolationException will 
return windowSize = 'numSamples' - 1
+    private QuotaViolationException quotaViolationException(double 
observedValue, double quota, int numSamples) {
+        int sampleWindowSec = 1;
+        MetricConfig metricConfig = new MetricConfig()
+                .timeWindow(sampleWindowSec, TimeUnit.SECONDS)
+                .samples(numSamples)
+                .quota(new Quota(quota, true));
+        KafkaMetric metric = new KafkaMetric(new Object(), metricName, new 
Rate(), metricConfig, time);
+        return new QuotaViolationException(metric, observedValue, quota);
+    }
+
+    private long throttleTime(double observedValue, double quota, int 
numSamples) {
+        QuotaViolationException e = quotaViolationException(observedValue, 
quota, numSamples);
+        return QuotaUtils.throttleTime(e, time.milliseconds());
+    }
+
+    private long boundedThrottleTime(double observedValue, double quota, int 
numSamples, long maxThrottleTime) {
+        QuotaViolationException e = quotaViolationException(observedValue, 
quota, numSamples);
+        return QuotaUtils.boundedThrottleTime(e, maxThrottleTime, 
time.milliseconds());
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
 
b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
index cd3cb496906..510c31231a9 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
@@ -20,7 +20,6 @@ import kafka.log.UnifiedLog;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.server.QuorumTestHarness;
-import kafka.server.QuotaType;
 import kafka.utils.EmptyTestInfo;
 import kafka.utils.TestUtils;
 
@@ -37,6 +36,7 @@ import 
org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.quota.QuotaType;
 import org.apache.kafka.tools.reassign.ReassignPartitionsCommand;
 
 import org.apache.log4j.PropertyConfigurator;
@@ -349,14 +349,14 @@ public class ReplicationQuotasTestRig {
 
         void printRateMetrics() {
             for (KafkaServer broker : servers) {
-                double leaderRate = measuredRate(broker, 
QuotaType.LeaderReplication$.MODULE$);
+                double leaderRate = measuredRate(broker, 
QuotaType.LEADER_REPLICATION);
                 if (broker.config().brokerId() == 100)
                     LOGGER.info("waiting... Leader rate on 101 is " + 
leaderRate);
                 record(leaderRates, broker.config().brokerId(), leaderRate);
                 if (leaderRate > 0)
                     LOGGER.trace("Leader Rate on " + 
broker.config().brokerId() + " is " + leaderRate);
 
-                double followerRate = measuredRate(broker, 
QuotaType.FollowerReplication$.MODULE$);
+                double followerRate = measuredRate(broker, 
QuotaType.FOLLOWER_REPLICATION);
                 record(followerRates, broker.config().brokerId(), 
followerRate);
                 if (followerRate > 0)
                     LOGGER.trace("Follower Rate on " + 
broker.config().brokerId() + " is " + followerRate);

Reply via email to