This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1841c07d4af KAFKA-17449 Move Quota classes to server-common module
(#17060)
1841c07d4af is described below
commit 1841c07d4af1daa55f5f332e7c844738c12e7f7c
Author: Mickael Maison <[email protected]>
AuthorDate: Sat Aug 31 06:41:34 2024 +0200
KAFKA-17449 Move Quota classes to server-common module (#17060)
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/import-control.xml | 1 +
.../java/kafka/log/remote/RemoteLogManager.java | 6 +-
.../kafka/log/remote/quota/RLMQuotaManager.java | 4 +-
.../main/scala/kafka/network/SocketServer.scala | 1 +
.../scala/kafka/server/ClientQuotaManager.scala | 4 +-
.../kafka/server/ClientRequestQuotaManager.scala | 11 +-
.../server/ControllerMutationQuotaManager.scala | 8 +-
.../src/main/scala/kafka/server/QuotaFactory.scala | 39 +-----
.../kafka/server/ReplicationQuotaManager.scala | 4 +-
core/src/main/scala/kafka/utils/QuotaUtils.scala | 75 -----------
.../log/remote/quota/RLMQuotaManagerTest.java | 5 +-
.../integration/kafka/api/BaseQuotaTest.scala | 17 +--
.../kafka/api/CustomQuotaCallbackTest.scala | 8 +-
.../kafka/api/PlaintextConsumerTest.scala | 21 +--
.../unit/kafka/server/ClientQuotaManagerTest.scala | 22 ++--
.../server/ClientRequestQuotaManagerTest.scala | 4 +-
.../ControllerMutationQuotaManagerTest.scala | 4 +-
.../kafka/server/ControllerMutationQuotaTest.scala | 3 +-
.../kafka/server/ReplicationQuotaManagerTest.scala | 10 +-
.../unit/kafka/server/ReplicationQuotasTest.scala | 4 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 19 +--
.../server/ThrottledChannelExpirationTest.scala | 4 +-
.../scala/unit/kafka/utils/QuotaUtilsTest.scala | 134 -------------------
.../org/apache/kafka/server/quota/QuotaType.java | 50 +++++++
.../org/apache/kafka/server/quota/QuotaUtils.java | 79 ++++++++++++
.../apache/kafka/server/quota/QuotaUtilsTest.java | 143 +++++++++++++++++++++
.../tools/other/ReplicationQuotasTestRig.java | 6 +-
27 files changed, 363 insertions(+), 323 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 0842f9189c2..285163816f2 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -293,6 +293,7 @@
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.log.remote.metadata.storage" />
<allow pkg="org.apache.kafka.server.log.remote.storage" />
+ <allow pkg="org.apache.kafka.server.quota" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.clients.producer" />
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 3f4da991fe6..7606f7f7f67 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -22,7 +22,6 @@ import kafka.log.UnifiedLog;
import kafka.log.remote.quota.RLMQuotaManager;
import kafka.log.remote.quota.RLMQuotaManagerConfig;
import kafka.log.remote.quota.RLMQuotaMetrics;
-import kafka.server.QuotaType;
import kafka.server.StopPartition;
import org.apache.kafka.common.KafkaException;
@@ -64,6 +63,7 @@ import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.quota.QuotaType;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.AbortedTxn;
@@ -291,12 +291,12 @@ public class RemoteLogManager implements Closeable {
}
RLMQuotaManager createRLMCopyQuotaManager() {
- return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics,
QuotaType.RLMCopy$.MODULE$,
+ return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics,
QuotaType.RLM_COPY,
"Tracking copy byte-rate for Remote Log Manager", time);
}
RLMQuotaManager createRLMFetchQuotaManager() {
- return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig),
metrics, QuotaType.RLMFetch$.MODULE$,
+ return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig),
metrics, QuotaType.RLM_FETCH,
"Tracking fetch byte-rate for Remote Log Manager", time);
}
diff --git a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
index 0f695a21e63..eb677c7fee4 100644
--- a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
+++ b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
@@ -16,9 +16,7 @@
*/
package kafka.log.remote.quota;
-import kafka.server.QuotaType;
import kafka.server.SensorAccess;
-import kafka.utils.QuotaUtils;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
@@ -29,6 +27,8 @@ import
org.apache.kafka.common.metrics.QuotaViolationException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.SimpleRate;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index d8545105689..a20d52f33f1 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -48,6 +48,7 @@ import org.apache.kafka.network.{ConnectionQuotaEntity,
ConnectionThrottledExcep
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.config.QuotaConfigs
import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.server.quota.QuotaUtils
import org.apache.kafka.server.util.FutureUtils
import org.slf4j.event.Level
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index bd12a51d3f3..144b9d2cfce 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.{ConcurrentHashMap, DelayQueue,
TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.network.RequestChannel
import kafka.server.ClientQuotaManager._
-import kafka.utils.{Logging, QuotaUtils}
+import kafka.utils.Logging
import org.apache.kafka.common.{Cluster, MetricName}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.Metrics
@@ -29,7 +29,7 @@ import org.apache.kafka.common.metrics.stats.{Avg,
CumulativeSum, Rate}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.config.{ClientQuotaManagerConfig,
ZooKeeperInternals}
-import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity,
ClientQuotaType, ThrottleCallback, ThrottledChannel}
+import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity,
ClientQuotaType, QuotaType, QuotaUtils, ThrottleCallback, ThrottledChannel}
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.network.Session
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index 437d6c371a2..248a7cb5125 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -18,13 +18,12 @@ package kafka.server
import java.util.concurrent.TimeUnit
import kafka.network.RequestChannel
-import kafka.utils.QuotaUtils
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.Rate
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.config.ClientQuotaManagerConfig
-import org.apache.kafka.server.quota.ClientQuotaCallback
+import org.apache.kafka.server.quota.{ClientQuotaCallback, QuotaType,
QuotaUtils}
import scala.jdk.CollectionConverters._
@@ -34,7 +33,7 @@ object ClientRequestQuotaManager {
// create once.
private val DefaultInactiveExemptSensorExpirationTimeSeconds: Long =
Long.MaxValue
- private val ExemptSensorName = "exempt-" + QuotaType.Request
+ private val ExemptSensorName = "exempt-" + QuotaType.REQUEST
}
class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
@@ -42,11 +41,11 @@ class ClientRequestQuotaManager(private val config:
ClientQuotaManagerConfig,
private val time: Time,
private val threadNamePrefix: String,
private val quotaCallback:
Option[ClientQuotaCallback])
- extends ClientQuotaManager(config, metrics, QuotaType.Request, time,
threadNamePrefix, quotaCallback) {
+ extends ClientQuotaManager(config, metrics, QuotaType.REQUEST, time,
threadNamePrefix, quotaCallback) {
private val maxThrottleTimeMs =
TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
private val exemptMetricName = metrics.metricName("exempt-request-time",
- QuotaType.Request.toString, "Tracking exempt-request-time utilization
percentage")
+ QuotaType.REQUEST.toString, "Tracking exempt-request-time utilization
percentage")
val exemptSensor: Sensor =
getOrCreateSensor(ClientRequestQuotaManager.ExemptSensorName,
ClientRequestQuotaManager.DefaultInactiveExemptSensorExpirationTimeSeconds,
@@ -85,7 +84,7 @@ class ClientRequestQuotaManager(private val config:
ClientQuotaManagerConfig,
}
override protected def clientQuotaMetricName(quotaMetricTags: Map[String,
String]): MetricName = {
- metrics.metricName("request-time", QuotaType.Request.toString,
+ metrics.metricName("request-time", QuotaType.REQUEST.toString,
"Tracking request-time per user/client-id",
quotaMetricTags.asJava)
}
diff --git
a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
index e1d38eb65ee..1a644e30d9a 100644
--- a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.metrics.stats.TokenBucket
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.Session
-import org.apache.kafka.server.quota.ClientQuotaCallback
+import org.apache.kafka.server.quota.{ClientQuotaCallback, QuotaType}
import org.apache.kafka.server.config.ClientQuotaManagerConfig
import scala.jdk.CollectionConverters._
@@ -166,16 +166,16 @@ class ControllerMutationQuotaManager(private val config:
ClientQuotaManagerConfi
private val time: Time,
private val threadNamePrefix: String,
private val quotaCallback:
Option[ClientQuotaCallback])
- extends ClientQuotaManager(config, metrics, QuotaType.ControllerMutation,
time, threadNamePrefix, quotaCallback) {
+ extends ClientQuotaManager(config, metrics, QuotaType.CONTROLLER_MUTATION,
time, threadNamePrefix, quotaCallback) {
override protected def clientQuotaMetricName(quotaMetricTags: Map[String,
String]): MetricName = {
- metrics.metricName("tokens", QuotaType.ControllerMutation.toString,
+ metrics.metricName("tokens", QuotaType.CONTROLLER_MUTATION.toString,
"Tracking remaining tokens in the token bucket per user/client-id",
quotaMetricTags.asJava)
}
private def clientRateMetricName(quotaMetricTags: Map[String, String]):
MetricName = {
- metrics.metricName("mutation-rate", QuotaType.ControllerMutation.toString,
+ metrics.metricName("mutation-rate", QuotaType.CONTROLLER_MUTATION.toString,
"Tracking mutation-rate per user/client-id",
quotaMetricTags.asJava)
}
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala
b/core/src/main/scala/kafka/server/QuotaFactory.scala
index f613463774a..c79ae37fc95 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -16,38 +16,13 @@
*/
package kafka.server
-import kafka.server.QuotaType._
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.server.quota.ClientQuotaCallback
+import org.apache.kafka.server.quota.{ClientQuotaCallback, QuotaType}
import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.config.{ClientQuotaManagerConfig,
ReplicationQuotaManagerConfig, QuotaConfigs}
-import org.apache.kafka.server.quota.ClientQuotaType
+import org.apache.kafka.server.config.{ClientQuotaManagerConfig, QuotaConfigs,
ReplicationQuotaManagerConfig}
-object QuotaType {
- case object Fetch extends QuotaType
- case object Produce extends QuotaType
- case object Request extends QuotaType
- case object ControllerMutation extends QuotaType
- case object LeaderReplication extends QuotaType
- case object FollowerReplication extends QuotaType
- case object AlterLogDirsReplication extends QuotaType
- case object RLMCopy extends QuotaType
- case object RLMFetch extends QuotaType
-
- def toClientQuotaType(quotaType: QuotaType): ClientQuotaType = {
- quotaType match {
- case QuotaType.Fetch => ClientQuotaType.FETCH
- case QuotaType.Produce => ClientQuotaType.PRODUCE
- case QuotaType.Request => ClientQuotaType.REQUEST
- case QuotaType.ControllerMutation => ClientQuotaType.CONTROLLER_MUTATION
- case _ => throw new IllegalArgumentException(s"Not a client quota type:
$quotaType")
- }
- }
-}
-
-sealed trait QuotaType
object QuotaFactory extends Logging {
@@ -79,14 +54,14 @@ object QuotaFactory extends Logging {
val clientQuotaCallback =
Option(cfg.getConfiguredInstance(QuotaConfigs.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG,
classOf[ClientQuotaCallback]))
QuotaManagers(
- new ClientQuotaManager(clientConfig(cfg), metrics, Fetch, time,
threadNamePrefix, clientQuotaCallback),
- new ClientQuotaManager(clientConfig(cfg), metrics, Produce, time,
threadNamePrefix, clientQuotaCallback),
+ new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.FETCH,
time, threadNamePrefix, clientQuotaCallback),
+ new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.PRODUCE,
time, threadNamePrefix, clientQuotaCallback),
new ClientRequestQuotaManager(clientConfig(cfg), metrics, time,
threadNamePrefix, clientQuotaCallback),
new ControllerMutationQuotaManager(clientControllerMutationConfig(cfg),
metrics, time,
threadNamePrefix, clientQuotaCallback),
- new ReplicationQuotaManager(replicationConfig(cfg), metrics,
LeaderReplication, time),
- new ReplicationQuotaManager(replicationConfig(cfg), metrics,
FollowerReplication, time),
- new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics,
AlterLogDirsReplication, time),
+ new ReplicationQuotaManager(replicationConfig(cfg), metrics,
QuotaType.LEADER_REPLICATION, time),
+ new ReplicationQuotaManager(replicationConfig(cfg), metrics,
QuotaType.FOLLOWER_REPLICATION, time),
+ new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics,
QuotaType.ALTER_LOG_DIRS_REPLICATION, time),
clientQuotaCallback
)
}
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index 43f508f4a36..ce50b99ff54 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -18,18 +18,16 @@ package kafka.server
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock
-
import scala.collection.Seq
-
import kafka.server.Constants._
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import org.apache.kafka.common.metrics._
-
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.stats.SimpleRate
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.config.ReplicationQuotaManagerConfig
+import org.apache.kafka.server.quota.QuotaType
trait ReplicaQuota {
def record(value: Long): Unit
diff --git a/core/src/main/scala/kafka/utils/QuotaUtils.scala
b/core/src/main/scala/kafka/utils/QuotaUtils.scala
deleted file mode 100755
index 93d5cdebcc3..00000000000
--- a/core/src/main/scala/kafka/utils/QuotaUtils.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.{KafkaMetric, Measurable,
QuotaViolationException}
-import org.apache.kafka.common.metrics.stats.Rate
-
-/**
- * Helper functions related to quotas
- */
-object QuotaUtils {
-
- /**
- * This calculates the amount of time needed to bring the observed rate
within quota
- * assuming that no new metrics are recorded.
- *
- * If O is the observed rate and T is the target rate over a window of W, to
bring O down to T,
- * we need to add a delay of X to W such that O * W / (W + X) = T.
- * Solving for X, we get X = (O - T)/T * W.
- *
- * @param timeMs current time in milliseconds
- * @return Delay in milliseconds
- */
- def throttleTime(e: QuotaViolationException, timeMs: Long): Long = {
- val difference = e.value - e.bound
- // Use the precise window used by the rate calculation
- val throttleTimeMs = difference / e.bound * windowSize(e.metric, timeMs)
- Math.round(throttleTimeMs)
- }
-
- /**
- * Calculates the amount of time needed to bring the observed rate within
quota using the same algorithm as
- * throttleTime() utility method but the returned value is capped to given
maxThrottleTime
- */
- def boundedThrottleTime(e: QuotaViolationException, maxThrottleTime: Long,
timeMs: Long): Long = {
- math.min(throttleTime(e, timeMs), maxThrottleTime)
- }
-
- /**
- * Returns window size of the given metric
- *
- * @param metric metric with measurable of type Rate
- * @param timeMs current time in milliseconds
- * @throws IllegalArgumentException if given measurable is not Rate
- */
- private def windowSize(metric: KafkaMetric, timeMs: Long): Long =
- measurableAsRate(metric.metricName,
metric.measurable).windowSize(metric.config, timeMs)
-
- /**
- * Casts provided Measurable to Rate
- * @throws IllegalArgumentException if given measurable is not Rate
- */
- private def measurableAsRate(name: MetricName, measurable: Measurable): Rate
= {
- measurable match {
- case r: Rate => r
- case _ => throw new IllegalArgumentException(s"Metric $name is not a
Rate metric, value $measurable")
- }
- }
-}
diff --git a/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java
b/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java
index ce6c3ce3ecc..53236e78d44 100644
--- a/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java
@@ -16,14 +16,13 @@
*/
package kafka.log.remote.quota;
-import kafka.server.QuotaType;
-
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Quota;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.server.quota.QuotaType;
import org.junit.jupiter.api.Test;
@@ -39,7 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class RLMQuotaManagerTest {
private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics(new MetricConfig(),
Collections.emptyList(), time);
- private static final QuotaType QUOTA_TYPE = QuotaType.RLMFetch$.MODULE$;
+ private static final QuotaType QUOTA_TYPE = QuotaType.RLM_FETCH;
private static final String DESCRIPTION = "Tracking byte rate";
@Test
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index e27fa687f11..5f9f9aac361 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
import com.yammer.metrics.core.{Histogram, Meter}
import kafka.api.QuotaTestClients._
-import kafka.server.{ClientQuotaManager, KafkaBroker, QuotaType}
+import kafka.server.{ClientQuotaManager, KafkaBroker}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
@@ -33,8 +33,9 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.server.config.{ServerConfigs, QuotaConfigs}
+import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.server.quota.QuotaType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
@@ -178,7 +179,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness
{
while ((!throttled || quotaTestClients.exemptRequestMetric == null ||
metricValue(quotaTestClients.exemptRequestMetric) <= 0)
&& System.currentTimeMillis < endTimeMs) {
consumer.poll(Duration.ofMillis(100L))
- val throttleMetric = quotaTestClients.throttleMetric(QuotaType.Request,
consumerClientId)
+ val throttleMetric = quotaTestClients.throttleMetric(QuotaType.REQUEST,
consumerClientId)
throttled = throttleMetric != null && metricValue(throttleMetric) > 0
}
@@ -221,7 +222,7 @@ abstract class QuotaTestClients(topic: String,
new ErrorLoggingCallback(topic, null, null, true))
numProduced += 1
do {
- val metric = throttleMetric(QuotaType.Produce, producerClientId)
+ val metric = throttleMetric(QuotaType.PRODUCE, producerClientId)
throttled = metric != null && metricValue(metric) > 0
} while (!future.isDone && (!throttled || waitForRequestCompletion))
} while (numProduced < maxRecords && !throttled)
@@ -237,7 +238,7 @@ abstract class QuotaTestClients(topic: String,
val startMs = System.currentTimeMillis
do {
numConsumed += consumer.poll(Duration.ofMillis(100L)).count
- val metric = throttleMetric(QuotaType.Fetch, consumerClientId)
+ val metric = throttleMetric(QuotaType.FETCH, consumerClientId)
throttled = metric != null && metricValue(metric) > 0
} while (numConsumed < maxRecords && !throttled &&
System.currentTimeMillis < startMs + timeoutMs)
@@ -266,7 +267,7 @@ abstract class QuotaTestClients(topic: String,
def verifyProduceThrottle(expectThrottle: Boolean, verifyClientMetric:
Boolean = true,
verifyRequestChannelMetric: Boolean = true): Unit
= {
- verifyThrottleTimeMetric(QuotaType.Produce, producerClientId,
expectThrottle)
+ verifyThrottleTimeMetric(QuotaType.PRODUCE, producerClientId,
expectThrottle)
if (verifyRequestChannelMetric)
verifyThrottleTimeRequestChannelMetric(ApiKeys.PRODUCE, "",
producerClientId, expectThrottle)
if (verifyClientMetric)
@@ -275,7 +276,7 @@ abstract class QuotaTestClients(topic: String,
def verifyConsumeThrottle(expectThrottle: Boolean, verifyClientMetric:
Boolean = true,
verifyRequestChannelMetric: Boolean = true): Unit
= {
- verifyThrottleTimeMetric(QuotaType.Fetch, consumerClientId, expectThrottle)
+ verifyThrottleTimeMetric(QuotaType.FETCH, consumerClientId, expectThrottle)
if (verifyRequestChannelMetric)
verifyThrottleTimeRequestChannelMetric(ApiKeys.FETCH, "Consumer",
consumerClientId, expectThrottle)
if (verifyClientMetric)
@@ -318,7 +319,7 @@ abstract class QuotaTestClients(topic: String,
}
def exemptRequestMetric: KafkaMetric = {
- val metricName = leaderNode.metrics.metricName("exempt-request-time",
QuotaType.Request.toString, "")
+ val metricName = leaderNode.metrics.metricName("exempt-request-time",
QuotaType.REQUEST.toString, "")
leaderNode.metrics.metrics.get(metricName)
}
diff --git
a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 7403f8e6e05..7f1eaa6c300 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -294,10 +294,10 @@ class CustomQuotaCallbackTest extends
IntegrationTestHarness with SaslSetup {
leaderNode.metrics.removeSensor(s"${quotaType}ThrottleTime-$sensorSuffix")
leaderNode.metrics.removeSensor(s"$quotaType-$sensorSuffix")
}
- removeSensors(QuotaType.Produce, producerClientId)
- removeSensors(QuotaType.Fetch, consumerClientId)
- removeSensors(QuotaType.Request, producerClientId)
- removeSensors(QuotaType.Request, consumerClientId)
+ removeSensors(QuotaType.PRODUCE, producerClientId)
+ removeSensors(QuotaType.FETCH, consumerClientId)
+ removeSensors(QuotaType.REQUEST, producerClientId)
+ removeSensors(QuotaType.REQUEST, consumerClientId)
}
private def quotaEntityName(userGroup: String): String = s"${userGroup}_"
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 8a00feacb0b..638e8d232d1 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -16,7 +16,7 @@ import java.time.Duration
import java.util
import java.util.Arrays.asList
import java.util.{Collections, Locale, Optional, Properties}
-import kafka.server.{KafkaBroker, QuotaType}
+import kafka.server.KafkaBroker
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}
import org.apache.kafka.clients.consumer._
@@ -27,6 +27,7 @@ import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.record.{CompressionType, TimestampType}
import org.apache.kafka.common.serialization._
import org.apache.kafka.common.{MetricName, TopicPartition}
+import org.apache.kafka.server.quota.QuotaType
import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Timeout
@@ -629,15 +630,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
"client-id", clientId)
assertNull(broker.metrics.metric(metricName), "Metric should not have
been created " + metricName)
}
- brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce,
producerClientId))
- brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce,
producerClientId))
- brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch,
consumerClientId))
- brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch,
consumerClientId))
-
- brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request,
producerClientId))
- brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request,
producerClientId))
- brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request,
consumerClientId))
- brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request,
consumerClientId))
+ brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.PRODUCE,
producerClientId))
+ brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.PRODUCE,
producerClientId))
+ brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.FETCH,
consumerClientId))
+ brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.FETCH,
consumerClientId))
+
+ brokers.foreach(assertNoMetric(_, "request-time", QuotaType.REQUEST,
producerClientId))
+ brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.REQUEST,
producerClientId))
+ brokers.foreach(assertNoMetric(_, "request-time", QuotaType.REQUEST,
consumerClientId))
+ brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.REQUEST,
consumerClientId))
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 59e206304a3..4a4107226dc 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -17,12 +17,12 @@
package kafka.server
import java.net.InetAddress
-import kafka.server.QuotaType._
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.config.{ClientQuotaManagerConfig,
ZooKeeperInternals}
import org.apache.kafka.network.Session
+import org.apache.kafka.server.quota.QuotaType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -30,7 +30,7 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
private val config = new ClientQuotaManagerConfig()
private def testQuotaParsing(config: ClientQuotaManagerConfig, client1:
UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient:
UserClient): Unit = {
- val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce,
time, "")
+ val clientQuotaManager = new ClientQuotaManager(config, metrics,
QuotaType.PRODUCE, time, "")
try {
// Case 1: Update the quota. Assert that the new quota value is returned
@@ -160,7 +160,7 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
def testGetMaxValueInQuotaWindowWithNonDefaultQuotaWindow(): Unit = {
val numFullQuotaWindows = 3 // 3 seconds window (vs. 10 seconds default)
val nonDefaultConfig = new ClientQuotaManagerConfig(numFullQuotaWindows +
1)
- val clientQuotaManager = new ClientQuotaManager(nonDefaultConfig, metrics,
Fetch, time, "")
+ val clientQuotaManager = new ClientQuotaManager(nonDefaultConfig, metrics,
QuotaType.FETCH, time, "")
val userSession = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"userA"), InetAddress.getLocalHost)
try {
@@ -179,7 +179,7 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
def testSetAndRemoveDefaultUserQuota(): Unit = {
// quotaTypesEnabled will be QuotaTypes.NoQuotas initially
val clientQuotaManager = new ClientQuotaManager(new
ClientQuotaManagerConfig(),
- metrics, Produce, time, "")
+ metrics, QuotaType.PRODUCE, time, "")
try {
// no quota set yet, should not throttle
@@ -201,7 +201,7 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
def testSetAndRemoveUserQuota(): Unit = {
// quotaTypesEnabled will be QuotaTypes.NoQuotas initially
val clientQuotaManager = new ClientQuotaManager(new
ClientQuotaManagerConfig(),
- metrics, Produce, time, "")
+ metrics, QuotaType.PRODUCE, time, "")
try {
// Set <user> quota config
@@ -220,7 +220,7 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
def testSetAndRemoveUserClientQuota(): Unit = {
// quotaTypesEnabled will be QuotaTypes.NoQuotas initially
val clientQuotaManager = new ClientQuotaManager(new
ClientQuotaManagerConfig(),
- metrics, Produce, time, "")
+ metrics, QuotaType.PRODUCE, time, "")
try {
// Set <user, client-id> quota config
@@ -238,7 +238,7 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
@Test
def testQuotaConfigPrecedence(): Unit = {
val clientQuotaManager = new ClientQuotaManager(new
ClientQuotaManagerConfig(),
- metrics, Produce, time, "")
+ metrics, QuotaType.PRODUCE, time, "")
try {
clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING),
None, None, Some(new Quota(1000, true)))
@@ -301,7 +301,7 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
@Test
def testQuotaViolation(): Unit = {
- val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce,
time, "")
+ val clientQuotaManager = new ClientQuotaManager(config, metrics,
QuotaType.PRODUCE, time, "")
val queueSizeMetric =
metrics.metrics().get(metrics.metricName("queue-size", "Produce", ""))
try {
clientQuotaManager.updateQuota(None,
Some(ZooKeeperInternals.DEFAULT_STRING),
Some(ZooKeeperInternals.DEFAULT_STRING),
@@ -350,7 +350,7 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
@Test
def testExpireThrottleTimeSensor(): Unit = {
- val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce,
time, "")
+ val clientQuotaManager = new ClientQuotaManager(config, metrics,
QuotaType.PRODUCE, time, "")
try {
clientQuotaManager.updateQuota(None,
Some(ZooKeeperInternals.DEFAULT_STRING),
Some(ZooKeeperInternals.DEFAULT_STRING),
Some(new Quota(500, true)))
@@ -372,7 +372,7 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
@Test
def testExpireQuotaSensors(): Unit = {
- val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce,
time, "")
+ val clientQuotaManager = new ClientQuotaManager(config, metrics,
QuotaType.PRODUCE, time, "")
try {
clientQuotaManager.updateQuota(None,
Some(ZooKeeperInternals.DEFAULT_STRING),
Some(ZooKeeperInternals.DEFAULT_STRING),
Some(new Quota(500, true)))
@@ -398,7 +398,7 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
@Test
def testClientIdNotSanitized(): Unit = {
- val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce,
time, "")
+ val clientQuotaManager = new ClientQuotaManager(config, metrics,
QuotaType.PRODUCE, time, "")
val clientId = "client@#$%"
try {
clientQuotaManager.updateQuota(None,
Some(ZooKeeperInternals.DEFAULT_STRING),
Some(ZooKeeperInternals.DEFAULT_STRING),
diff --git
a/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
index 31a3e6e753d..ec09f3a7d6d 100644
--- a/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
@@ -16,9 +16,9 @@
*/
package kafka.server
-import kafka.server.QuotaType.Request
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.server.config.ClientQuotaManagerConfig
+import org.apache.kafka.server.quota.QuotaType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -29,7 +29,7 @@ class ClientRequestQuotaManagerTest extends
BaseClientQuotaManagerTest {
def testRequestPercentageQuotaViolation(): Unit = {
val clientRequestQuotaManager = new ClientRequestQuotaManager(config,
metrics, time, "", None)
clientRequestQuotaManager.updateQuota(Some("ANONYMOUS"),
Some("test-client"), Some("test-client"), Some(Quota.upperBound(1)))
- val queueSizeMetric =
metrics.metrics().get(metrics.metricName("queue-size", Request.toString, ""))
+ val queueSizeMetric =
metrics.metrics().get(metrics.metricName("queue-size",
QuotaType.REQUEST.toString, ""))
def millisToPercent(millis: Double) = millis * 1000 * 1000 *
ClientRequestQuotaManager.NanosToPercentagePerSecond
try {
// We have 10 second windows. Make sure that there is no quota violation
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaManagerTest.scala
index dea6d554f1a..8fa4a290cc7 100644
---
a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaManagerTest.scala
@@ -17,7 +17,6 @@
package kafka.server
import java.util.concurrent.TimeUnit
-import kafka.server.QuotaType.ControllerMutation
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
import org.apache.kafka.common.metrics.MetricConfig
import org.apache.kafka.common.metrics.Metrics
@@ -26,6 +25,7 @@ import org.apache.kafka.common.metrics.QuotaViolationException
import org.apache.kafka.common.metrics.stats.TokenBucket
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.server.config.ClientQuotaManagerConfig
+import org.apache.kafka.server.quota.QuotaType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
@@ -148,7 +148,7 @@ class ControllerMutationQuotaManagerTest extends
BaseClientQuotaManagerTest {
quotaManager.updateQuota(Some(User), Some(ClientId), Some(ClientId),
Some(Quota.upperBound(10)))
val queueSizeMetric = metrics.metrics().get(
- metrics.metricName("queue-size", ControllerMutation.toString, ""))
+ metrics.metricName("queue-size",
QuotaType.CONTROLLER_MUTATION.toString, ""))
// Verify that there is no quota violation if we remain under the quota.
for (_ <- 0 until 10) {
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
index 7886b851b97..e0f8be559f7 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
@@ -43,6 +43,7 @@ import
org.apache.kafka.common.security.auth.AuthenticationContext
import org.apache.kafka.common.security.auth.KafkaPrincipal
import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs}
+import org.apache.kafka.server.quota.QuotaType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions.assertEquals
@@ -406,7 +407,7 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
else brokers.head.metrics
val metricName = metrics.metricName(
"tokens",
- QuotaType.ControllerMutation.toString,
+ QuotaType.CONTROLLER_MUTATION.toString,
"Tracking remaining tokens in the token bucket per user/client-id",
Map(DefaultTags.User -> user, DefaultTags.ClientId -> "").asJava)
Option(metrics.metric(metricName))
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
index 7f2346d36a0..ff90883a4a4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
@@ -17,11 +17,11 @@
package kafka.server
import java.util.Collections
-import kafka.server.QuotaType._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.server.config.ReplicationQuotaManagerConfig
+import org.apache.kafka.server.quota.QuotaType
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, Test}
@@ -38,7 +38,7 @@ class ReplicationQuotaManagerTest {
@Test
def shouldThrottleOnlyDefinedReplicas(): Unit = {
- val quota = new ReplicationQuotaManager(new
ReplicationQuotaManagerConfig(), metrics, QuotaType.Fetch, time)
+ val quota = new ReplicationQuotaManager(new
ReplicationQuotaManagerConfig(), metrics, QuotaType.FETCH, time)
quota.markThrottled("topic1", Seq(1, 2, 3))
assertTrue(quota.isThrottled(tp1(1)))
@@ -49,7 +49,7 @@ class ReplicationQuotaManagerTest {
@Test
def shouldExceedQuotaThenReturnBackBelowBoundAsTimePasses(): Unit = {
- val quota = new ReplicationQuotaManager(new
ReplicationQuotaManagerConfig(10, 1), metrics, LeaderReplication, time)
+ val quota = new ReplicationQuotaManager(new
ReplicationQuotaManagerConfig(10, 1), metrics, QuotaType.LEADER_REPLICATION,
time)
//Given
quota.updateQuota(new Quota(100, true))
@@ -103,14 +103,14 @@ class ReplicationQuotaManagerTest {
}
def rate(metrics: Metrics): Double = {
- val metricName = metrics.metricName("byte-rate",
LeaderReplication.toString, "Tracking byte-rate for " + LeaderReplication)
+ val metricName = metrics.metricName("byte-rate",
QuotaType.LEADER_REPLICATION.toString, "Tracking byte-rate for " +
QuotaType.LEADER_REPLICATION)
val leaderThrottledRate =
metrics.metrics.asScala(metricName).metricValue.asInstanceOf[Double]
leaderThrottledRate
}
@Test
def shouldSupportWildcardThrottledReplicas(): Unit = {
- val quota = new ReplicationQuotaManager(new
ReplicationQuotaManagerConfig(), metrics, LeaderReplication, time)
+ val quota = new ReplicationQuotaManager(new
ReplicationQuotaManagerConfig(), metrics, QuotaType.LEADER_REPLICATION, time)
//When
quota.markThrottled("MyTopic")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index c9984c24baa..2dad987bc61 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -21,7 +21,6 @@ import java.util.AbstractMap.SimpleImmutableEntry
import java.util.{Collections, Properties}
import java.util.Map.Entry
import kafka.server.KafkaConfig.fromProps
-import kafka.server.QuotaType._
import kafka.utils.TestUtils._
import kafka.utils.CoreUtils._
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
@@ -37,6 +36,7 @@ import
org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT
import org.apache.kafka.controller.ControllerRequestContextUtil
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.config.QuotaConfigs
+import org.apache.kafka.server.quota.QuotaType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.params.ParameterizedTest
@@ -198,7 +198,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
// In a short test the brokers can be read unfairly, so assert against the
average
val rateUpperBound = throttle * 1.1
val rateLowerBound = throttle * 0.5
- val rate = if (leaderThrottle) avRate(LeaderReplication, 100 to 105) else
avRate(FollowerReplication, 106 to 107)
+ val rate = if (leaderThrottle) avRate(QuotaType.LEADER_REPLICATION, 100 to
105) else avRate(QuotaType.FOLLOWER_REPLICATION, 106 to 107)
assertTrue(rate < rateUpperBound, s"Expected $rate < $rateUpperBound")
assertTrue(rate > rateLowerBound, s"Expected $rate > $rateLowerBound")
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index a98b7cb2ded..8be1a9787f5 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -47,6 +47,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.network.Session
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext,
AuthorizationResult}
import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs}
+import org.apache.kafka.server.quota.QuotaType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
@@ -229,7 +230,7 @@ class RequestQuotaTest extends BaseRequestTest {
def session(user: String): Session = new Session(new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null)
private def throttleTimeMetricValue(clientId: String): Double = {
- throttleTimeMetricValueForQuotaType(clientId, QuotaType.Request)
+ throttleTimeMetricValueForQuotaType(clientId, QuotaType.REQUEST)
}
private def throttleTimeMetricValueForQuotaType(clientId: String, quotaType:
QuotaType): Double = {
@@ -241,7 +242,7 @@ class RequestQuotaTest extends BaseRequestTest {
}
private def requestTimeMetricValue(clientId: String): Double = {
- val metricName = leaderNode.metrics.metricName("request-time",
QuotaType.Request.toString,
+ val metricName = leaderNode.metrics.metricName("request-time",
QuotaType.REQUEST.toString,
"", "user", "", "client-id", clientId)
val sensor =
leaderNode.quotaManagers.request.getOrCreateQuotaSensors(session("ANONYMOUS"),
clientId).quotaSensor
@@ -249,7 +250,7 @@ class RequestQuotaTest extends BaseRequestTest {
}
private def exemptRequestMetricValue: Double = {
- val metricName = leaderNode.metrics.metricName("exempt-request-time",
QuotaType.Request.toString, "")
+ val metricName = leaderNode.metrics.metricName("exempt-request-time",
QuotaType.REQUEST.toString, "")
metricValue(leaderNode.metrics.metrics.get(metricName),
leaderNode.quotaManagers.request.exemptSensor)
}
@@ -781,8 +782,8 @@ class RequestQuotaTest extends BaseRequestTest {
override def toString: String = {
val requestTime = requestTimeMetricValue(clientId)
val throttleTime = throttleTimeMetricValue(clientId)
- val produceThrottleTime = throttleTimeMetricValueForQuotaType(clientId,
QuotaType.Produce)
- val consumeThrottleTime = throttleTimeMetricValueForQuotaType(clientId,
QuotaType.Fetch)
+ val produceThrottleTime = throttleTimeMetricValueForQuotaType(clientId,
QuotaType.PRODUCE)
+ val consumeThrottleTime = throttleTimeMetricValueForQuotaType(clientId,
QuotaType.FETCH)
s"Client $clientId apiKey $apiKey requests $correlationId requestTime
$requestTime " +
s"throttleTime $throttleTime produceThrottleTime $produceThrottleTime
consumeThrottleTime $consumeThrottleTime"
}
@@ -826,9 +827,9 @@ class RequestQuotaTest extends BaseRequestTest {
val throttled = smallQuotaProducerClient.runUntil(_.throttleTimeMs > 0)
assertTrue(throttled, s"Response not throttled: $smallQuotaProducerClient")
- assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaProducerClientId,
QuotaType.Produce) > 0,
+ assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaProducerClientId,
QuotaType.PRODUCE) > 0,
s"Throttle time metrics for produce quota not updated:
$smallQuotaProducerClient")
- assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaProducerClientId,
QuotaType.Request).isNaN,
+ assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaProducerClientId,
QuotaType.REQUEST).isNaN,
s"Throttle time metrics for request quota updated:
$smallQuotaProducerClient")
}
@@ -839,9 +840,9 @@ class RequestQuotaTest extends BaseRequestTest {
val throttled = smallQuotaConsumerClient.runUntil(_.throttleTimeMs > 0)
assertTrue(throttled, s"Response not throttled:
$smallQuotaConsumerClientId")
- assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId,
QuotaType.Fetch) > 0,
+ assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId,
QuotaType.FETCH) > 0,
s"Throttle time metrics for consumer quota not updated:
$smallQuotaConsumerClient")
- assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId,
QuotaType.Request).isNaN,
+ assertTrue(throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId,
QuotaType.REQUEST).isNaN,
s"Throttle time metrics for request quota updated:
$smallQuotaConsumerClient")
}
diff --git
a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
index 06def9021ce..6d8adef1f8b 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.DelayQueue
import org.apache.kafka.common.metrics.MetricConfig
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.server.config.ClientQuotaManagerConfig
-import org.apache.kafka.server.quota.{ThrottleCallback, ThrottledChannel}
+import org.apache.kafka.server.quota.{QuotaType, ThrottleCallback,
ThrottledChannel}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
@@ -52,7 +52,7 @@ class ThrottledChannelExpirationTest {
@Test
def testCallbackInvocationAfterExpiration(): Unit = {
- val clientMetrics = new ClientQuotaManager(new ClientQuotaManagerConfig(),
metrics, QuotaType.Produce, time, "")
+ val clientMetrics = new ClientQuotaManager(new ClientQuotaManagerConfig(),
metrics, QuotaType.PRODUCE, time, "")
val delayQueue = new DelayQueue[ThrottledChannel]()
val reaper = new clientMetrics.ThrottledChannelReaper(delayQueue, "")
diff --git a/core/src/test/scala/unit/kafka/utils/QuotaUtilsTest.scala
b/core/src/test/scala/unit/kafka/utils/QuotaUtilsTest.scala
deleted file mode 100755
index 8d1741aff81..00000000000
--- a/core/src/test/scala/unit/kafka/utils/QuotaUtilsTest.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.concurrent.TimeUnit
-import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.{KafkaMetric, MetricConfig, Quota,
QuotaViolationException}
-import org.apache.kafka.common.metrics.stats.{Rate, Value}
-import org.apache.kafka.server.util.MockTime
-
-import scala.jdk.CollectionConverters._
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-class QuotaUtilsTest {
-
- private val time = new MockTime
- private val numSamples = 10
- private val sampleWindowSec = 1
- private val maxThrottleTimeMs = 500
- private val metricName = new MetricName("test-metric", "groupA", "testA",
Map.empty.asJava)
-
- @Test
- def testThrottleTimeObservedRateEqualsQuota(): Unit = {
- val numSamples = 10
- val observedValue = 16.5
-
- assertEquals(0, throttleTime(observedValue, observedValue, numSamples))
-
- // should be independent of window size
- assertEquals(0, throttleTime(observedValue, observedValue, numSamples + 1))
- }
-
- @Test
- def testThrottleTimeObservedRateBelowQuota(): Unit = {
- val observedValue = 16.5
- val quota = 20.4
- assertTrue(throttleTime(observedValue, quota, numSamples) < 0)
-
- // should be independent of window size
- assertTrue(throttleTime(observedValue, quota, numSamples + 1) < 0)
- }
-
- @Test
- def testThrottleTimeObservedRateAboveQuota(): Unit = {
- val quota = 50.0
- val observedValue = 100.0
- assertEquals(2000, throttleTime(observedValue, quota, 3))
- }
-
- @Test
- def testBoundedThrottleTimeObservedRateEqualsQuota(): Unit = {
- val observedValue = 18.2
- assertEquals(0, boundedThrottleTime(observedValue, observedValue,
numSamples, maxThrottleTimeMs))
-
- // should be independent of window size
- assertEquals(0, boundedThrottleTime(observedValue, observedValue,
numSamples + 1, maxThrottleTimeMs))
- }
-
- @Test
- def testBoundedThrottleTimeObservedRateBelowQuota(): Unit = {
- val observedValue = 16.5
- val quota = 22.4
-
- assertTrue(boundedThrottleTime(observedValue, quota, numSamples,
maxThrottleTimeMs) < 0)
-
- // should be independent of window size
- assertTrue(boundedThrottleTime(observedValue, quota, numSamples + 1,
maxThrottleTimeMs) < 0)
- }
-
- @Test
- def testBoundedThrottleTimeObservedRateAboveQuotaBelowLimit(): Unit = {
- val quota = 50.0
- val observedValue = 55.0
- assertEquals(100, boundedThrottleTime(observedValue, quota, 2,
maxThrottleTimeMs))
- }
-
- @Test
- def testBoundedThrottleTimeObservedRateAboveQuotaAboveLimit(): Unit = {
- val quota = 50.0
- val observedValue = 100.0
- assertEquals(maxThrottleTimeMs, boundedThrottleTime(observedValue, quota,
numSamples, maxThrottleTimeMs))
- }
-
- @Test
- def testThrottleTimeThrowsExceptionIfProvidedNonRateMetric(): Unit = {
- val testMetric = new KafkaMetric(new Object(), metricName, new Value(),
new MetricConfig, time)
-
- assertThrows(classOf[IllegalArgumentException], () =>
QuotaUtils.throttleTime(new QuotaViolationException(testMetric, 10.0, 20.0),
time.milliseconds))
- }
-
- @Test
- def testBoundedThrottleTimeThrowsExceptionIfProvidedNonRateMetric(): Unit = {
- val testMetric = new KafkaMetric(new Object(), metricName, new Value(),
new MetricConfig, time)
-
- assertThrows(classOf[IllegalArgumentException], () =>
QuotaUtils.boundedThrottleTime(new QuotaViolationException(testMetric, 10.0,
20.0),
- maxThrottleTimeMs, time.milliseconds))
- }
-
- // the `metric` passed into the returned QuotaViolationException will return
windowSize = 'numSamples' - 1
- private def quotaViolationException(observedValue: Double, quota: Double,
numSamples: Int): QuotaViolationException = {
- val metricConfig = new MetricConfig()
- .timeWindow(sampleWindowSec, TimeUnit.SECONDS)
- .samples(numSamples)
- .quota(new Quota(quota, true))
- val metric = new KafkaMetric(new Object(), metricName, new Rate(),
metricConfig, time)
- new QuotaViolationException(metric, observedValue, quota)
- }
-
- private def throttleTime(observedValue: Double, quota: Double, numSamples:
Int): Long = {
- val e = quotaViolationException(observedValue, quota, numSamples)
- QuotaUtils.throttleTime(e, time.milliseconds)
- }
-
- private def boundedThrottleTime(observedValue: Double, quota: Double,
numSamples: Int, maxThrottleTime: Long): Long = {
- val e = quotaViolationException(observedValue, quota, numSamples)
- QuotaUtils.boundedThrottleTime(e, maxThrottleTime, time.milliseconds)
- }
-}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/quota/QuotaType.java
b/server-common/src/main/java/org/apache/kafka/server/quota/QuotaType.java
new file mode 100644
index 00000000000..e1454d6ab01
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/quota/QuotaType.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+public enum QuotaType {
+ FETCH("Fetch"),
+ PRODUCE("Produce"),
+ REQUEST("Request"),
+ CONTROLLER_MUTATION("ControllerMutation"),
+ LEADER_REPLICATION("LeaderReplication"),
+ FOLLOWER_REPLICATION("FollowerReplication"),
+ ALTER_LOG_DIRS_REPLICATION("AlterLogDirsReplication"),
+ RLM_COPY("RLMCopy"),
+ RLM_FETCH("RLMFetch");
+
+ private final String name;
+
+ QuotaType(String name) {
+ this.name = name;
+ }
+
+ public static ClientQuotaType toClientQuotaType(QuotaType quotaType) {
+ switch (quotaType) {
+ case FETCH: return ClientQuotaType.FETCH;
+ case PRODUCE: return ClientQuotaType.PRODUCE;
+ case REQUEST: return ClientQuotaType.REQUEST;
+ case CONTROLLER_MUTATION: return
ClientQuotaType.CONTROLLER_MUTATION;
+ default: throw new IllegalArgumentException("Not a client quota
type: " + quotaType);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/quota/QuotaUtils.java
b/server-common/src/main/java/org/apache/kafka/server/quota/QuotaUtils.java
new file mode 100644
index 00000000000..4b9e7401af0
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/quota/QuotaUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.stats.Rate;
+
+/**
+ * Helper functions related to quotas
+ */
+public class QuotaUtils {
+
+ /**
+ * This calculates the amount of time needed to bring the observed rate
within quota
+ * assuming that no new metrics are recorded.
+ * <br/>
+ * If O is the observed rate and T is the target rate over a window of W,
to bring O down to T,
+ * we need to add a delay of X to W such that O * W / (W + X) = T.
+ * Solving for X, we get X = (O - T)/T * W.
+ *
+ * @param timeMs current time in milliseconds
+ * @return Delay in milliseconds
+ */
+ public static long throttleTime(QuotaViolationException e, long timeMs) {
+ double difference = e.value() - e.bound();
+ // Use the precise window used by the rate calculation
+ double throttleTimeMs = difference / e.bound() *
windowSize(e.metric(), timeMs);
+ return Math.round(throttleTimeMs);
+ }
+
+ /**
+ * Calculates the amount of time needed to bring the observed rate within
quota using the same algorithm as
+ * throttleTime() utility method but the returned value is capped to given
maxThrottleTime
+ */
+ public static long boundedThrottleTime(QuotaViolationException e, long
maxThrottleTime, long timeMs) {
+ return Math.min(throttleTime(e, timeMs), maxThrottleTime);
+ }
+
+ /**
+ * Returns window size of the given metric
+ *
+ * @param metric metric with measurable of type Rate
+ * @param timeMs current time in milliseconds
+ * @throws IllegalArgumentException if given measurable is not Rate
+ */
+ private static long windowSize(KafkaMetric metric, long timeMs) {
+ return measurableAsRate(metric.metricName(),
metric.measurable()).windowSize(metric.config(), timeMs);
+ }
+
+ /**
+ * Casts provided Measurable to Rate
+ * @throws IllegalArgumentException if given measurable is not Rate
+ */
+ private static Rate measurableAsRate(MetricName name, Measurable
measurable) {
+ if (measurable instanceof Rate) {
+ return (Rate) measurable;
+ } else {
+ throw new IllegalArgumentException("Metric " + name + " is not a
Rate metric, value " + measurable);
+ }
+ }
+
+}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/quota/QuotaUtilsTest.java
b/server-common/src/test/java/org/apache/kafka/server/quota/QuotaUtilsTest.java
new file mode 100644
index 00000000000..d0c1c699cbe
--- /dev/null
+++
b/server-common/src/test/java/org/apache/kafka/server/quota/QuotaUtilsTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class QuotaUtilsTest {
+
+ private final Time time = new MockTime();
+ private final int numSamples = 10;
+ private final int maxThrottleTimeMs = 500;
+ private final MetricName metricName = new MetricName("test-metric",
"groupA", "testA", Collections.emptyMap());
+
+ @Test
+ public void testThrottleTimeObservedRateEqualsQuota() {
+ int numSamples = 10;
+ double observedValue = 16.5;
+
+ assertEquals(0, throttleTime(observedValue, observedValue,
numSamples));
+
+ // should be independent of window size
+ assertEquals(0, throttleTime(observedValue, observedValue, numSamples
+ 1));
+ }
+
+ @Test
+ public void testThrottleTimeObservedRateBelowQuota() {
+ double observedValue = 16.5;
+ double quota = 20.4;
+ assertTrue(throttleTime(observedValue, quota, numSamples) < 0);
+
+ // should be independent of window size
+ assertTrue(throttleTime(observedValue, quota, numSamples + 1) < 0);
+ }
+
+ @Test
+ public void testThrottleTimeObservedRateAboveQuota() {
+ double quota = 50.0;
+ double observedValue = 100.0;
+ assertEquals(2000, throttleTime(observedValue, quota, 3));
+ }
+
+ @Test
+ public void testBoundedThrottleTimeObservedRateEqualsQuota() {
+ double observedValue = 18.2;
+ assertEquals(0, boundedThrottleTime(observedValue, observedValue,
numSamples, maxThrottleTimeMs));
+
+ // should be independent of window size
+ assertEquals(0, boundedThrottleTime(observedValue, observedValue,
numSamples + 1, maxThrottleTimeMs));
+ }
+
+ @Test
+ public void testBoundedThrottleTimeObservedRateBelowQuota() {
+ double observedValue = 16.5;
+ double quota = 22.4;
+
+ assertTrue(boundedThrottleTime(observedValue, quota, numSamples,
maxThrottleTimeMs) < 0);
+
+ // should be independent of window size
+ assertTrue(boundedThrottleTime(observedValue, quota, numSamples + 1,
maxThrottleTimeMs) < 0);
+ }
+
+ @Test
+ public void testBoundedThrottleTimeObservedRateAboveQuotaBelowLimit() {
+ double quota = 50.0;
+ double observedValue = 55.0;
+ assertEquals(100, boundedThrottleTime(observedValue, quota, 2,
maxThrottleTimeMs));
+ }
+
+ @Test
+ public void testBoundedThrottleTimeObservedRateAboveQuotaAboveLimit() {
+ double quota = 50.0;
+ double observedValue = 100.0;
+ assertEquals(maxThrottleTimeMs, boundedThrottleTime(observedValue,
quota, numSamples, maxThrottleTimeMs));
+ }
+
+ @Test
+ public void testThrottleTimeThrowsExceptionIfProvidedNonRateMetric() {
+ KafkaMetric testMetric = new KafkaMetric(new Object(), metricName, new
Value(), new MetricConfig(), time);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> QuotaUtils.throttleTime(new
QuotaViolationException(testMetric, 10.0, 20.0), time.milliseconds()));
+ }
+
+ @Test
+ public void
testBoundedThrottleTimeThrowsExceptionIfProvidedNonRateMetric() {
+ KafkaMetric testMetric = new KafkaMetric(new Object(), metricName, new
Value(), new MetricConfig(), time);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> QuotaUtils.boundedThrottleTime(new
QuotaViolationException(testMetric, 10.0, 20.0), maxThrottleTimeMs,
time.milliseconds()));
+ }
+
+ // the `metric` passed into the returned QuotaViolationException will
return windowSize = 'numSamples' - 1
+ private QuotaViolationException quotaViolationException(double
observedValue, double quota, int numSamples) {
+ int sampleWindowSec = 1;
+ MetricConfig metricConfig = new MetricConfig()
+ .timeWindow(sampleWindowSec, TimeUnit.SECONDS)
+ .samples(numSamples)
+ .quota(new Quota(quota, true));
+ KafkaMetric metric = new KafkaMetric(new Object(), metricName, new
Rate(), metricConfig, time);
+ return new QuotaViolationException(metric, observedValue, quota);
+ }
+
+ private long throttleTime(double observedValue, double quota, int
numSamples) {
+ QuotaViolationException e = quotaViolationException(observedValue,
quota, numSamples);
+ return QuotaUtils.throttleTime(e, time.milliseconds());
+ }
+
+ private long boundedThrottleTime(double observedValue, double quota, int
numSamples, long maxThrottleTime) {
+ QuotaViolationException e = quotaViolationException(observedValue,
quota, numSamples);
+ return QuotaUtils.boundedThrottleTime(e, maxThrottleTime,
time.milliseconds());
+ }
+}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
index cd3cb496906..510c31231a9 100644
---
a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
+++
b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
@@ -20,7 +20,6 @@ import kafka.log.UnifiedLog;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.QuorumTestHarness;
-import kafka.server.QuotaType;
import kafka.utils.EmptyTestInfo;
import kafka.utils.TestUtils;
@@ -37,6 +36,7 @@ import
org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.quota.QuotaType;
import org.apache.kafka.tools.reassign.ReassignPartitionsCommand;
import org.apache.log4j.PropertyConfigurator;
@@ -349,14 +349,14 @@ public class ReplicationQuotasTestRig {
void printRateMetrics() {
for (KafkaServer broker : servers) {
- double leaderRate = measuredRate(broker,
QuotaType.LeaderReplication$.MODULE$);
+ double leaderRate = measuredRate(broker,
QuotaType.LEADER_REPLICATION);
if (broker.config().brokerId() == 100)
LOGGER.info("waiting... Leader rate on 101 is " +
leaderRate);
record(leaderRates, broker.config().brokerId(), leaderRate);
if (leaderRate > 0)
LOGGER.trace("Leader Rate on " +
broker.config().brokerId() + " is " + leaderRate);
- double followerRate = measuredRate(broker,
QuotaType.FollowerReplication$.MODULE$);
+ double followerRate = measuredRate(broker,
QuotaType.FOLLOWER_REPLICATION);
record(followerRates, broker.config().brokerId(),
followerRate);
if (followerRate > 0)
LOGGER.trace("Follower Rate on " +
broker.config().brokerId() + " is " + followerRate);