This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e3817cac894 KAFKA-14351: Controller Mutation Quota for KRaft (#13116)
e3817cac894 is described below
commit e3817cac894140e0b26c09050b10ce653f7c3d97
Author: Ron Dagostino <[email protected]>
AuthorDate: Tue Mar 7 14:25:34 2023 -0500
KAFKA-14351: Controller Mutation Quota for KRaft (#13116)
Implement KIP-599 controller mutation quotas for the KRaft controller.
These quotas apply to create
topics, create partitions, and delete topic operations. They are specified
in terms of number of
partitions.
The approach taken here is to reuse the ControllerMutationQuotaManager that
is also used in ZK
mode. The quotas are implemented as Sensor objects and Sensor.checkQuotas
enforces the quota,
whereas Sensor.record notes that new partitions have been modified. While
ControllerApis handles
fetching the Sensor objects, we must make the final callback to check the
quotas from within
QuorumController. The reason is because only QuorumController knows the
final number of partitions
that must be modified. (As one example, up-to-date information about the
number of partitions that
will be deleted when a topic is deleted is really only available in
QuorumController.)
For quota enforcement, the logic is already in place. The KRaft controller
is expected to set the
throttle time in the response that is embedded in EnvelopeResponse, but it
does not actually apply
the throttle because there is no client connection to throttle. Instead,
the broker that forwarded
the request is expected to return the throttle value from the controller
and to throttle the client
connection. It also applies its own request quota, so the enforced/returned
quota is the maximum of
the two.
This PR also installs a DynamicConfigPublisher in ControllerServer. This
allows dynamic
configurations to be published on the controller. Previously, they could be
set, but they were not
applied. Note that we still don't have a good way to set node-level
configurations for isolatied
controllers. However, this will allow us to set cluster configs (aka
default node configs) and have
them take effect on the controllers.
In a similar vein, this PR separates out the dynamic client quota publisher
logic used on the
broker into DynamicClientQuotaPublisher. We can now install this on both
BrokerServer and
ControllerServer. This makes dynamically configuring quotas (such as
controller mutation quotas)
possible.
Also add a ducktape test, controller_mutation_quota_test.py.
Reviewers: David Jacot <[email protected]>, Ismael Juma
<[email protected]>, Colin P. McCabe <[email protected]>
---
.../main/scala/kafka/server/ControllerApis.scala | 27 +++-
.../main/scala/kafka/server/ControllerServer.scala | 39 +++--
.../scala/kafka/server/DynamicBrokerConfig.scala | 99 ++++++++----
.../metadata/DynamicClientQuotaPublisher.scala | 82 ++++++++++
.../server/metadata/DynamicConfigPublisher.scala | 43 +++++-
core/src/test/java/kafka/test/MockController.java | 84 +++++++---
.../kafka/server/KRaftClusterTest.scala | 89 ++++++++++-
.../unit/kafka/server/ControllerApisTest.scala | 141 ++++++++++++++++-
.../kafka/server/DynamicBrokerConfigTest.scala | 155 ++++++++++++++++---
.../kafka/controller/ControllerRequestContext.java | 37 ++++-
.../apache/kafka/controller/QuorumController.java | 6 +-
.../controller/ReplicationControlManager.java | 57 +++++--
.../controller/ControllerRequestContextUtil.java | 23 ++-
.../controller/ReplicationControlManagerTest.java | 171 +++++++++++++++++----
.../tests/core/controller_mutation_quota_test.py | 142 +++++++++++++++++
15 files changed, 1040 insertions(+), 155 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 0964c07215c..b8f233fa94f 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -17,10 +17,11 @@
package kafka.server
-import java.util
+import java.{lang, util}
import java.util.{Collections, OptionalLong}
import java.util.Map.Entry
import java.util.concurrent.CompletableFuture
+import java.util.function.Consumer
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
@@ -175,8 +176,10 @@ class ControllerApis(val requestChannel: RequestChannel,
def handleDeleteTopics(request: RequestChannel.Request):
CompletableFuture[Unit] = {
val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+ val controllerMutationQuota =
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 5)
val context = new ControllerRequestContext(request.context.header.data,
request.context.principal,
- requestTimeoutMsToDeadlineNs(time, deleteTopicsRequest.data.timeoutMs))
+ requestTimeoutMsToDeadlineNs(time, deleteTopicsRequest.data.timeoutMs),
+ controllerMutationQuotaRecorderFor(controllerMutationQuota))
val future = deleteTopics(context,
deleteTopicsRequest.data,
request.context.apiVersion,
@@ -184,7 +187,7 @@ class ControllerApis(val requestChannel: RequestChannel,
names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
names)(n => n),
names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
names)(n => n))
future.handle[Unit] { (results, exception) =>
- requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota,
request, throttleTimeMs => {
if (exception != null) {
deleteTopicsRequest.getErrorResponse(throttleTimeMs, exception)
} else {
@@ -339,8 +342,10 @@ class ControllerApis(val requestChannel: RequestChannel,
def handleCreateTopics(request: RequestChannel.Request):
CompletableFuture[Unit] = {
val createTopicsRequest = request.body[CreateTopicsRequest]
+ val controllerMutationQuota =
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
val context = new ControllerRequestContext(request.context.header.data,
request.context.principal,
- requestTimeoutMsToDeadlineNs(time, createTopicsRequest.data.timeoutMs))
+ requestTimeoutMsToDeadlineNs(time, createTopicsRequest.data.timeoutMs),
+ controllerMutationQuotaRecorderFor(controllerMutationQuota))
val future = createTopics(context,
createTopicsRequest.data,
authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
logIfDenied = false),
@@ -348,7 +353,7 @@ class ControllerApis(val requestChannel: RequestChannel,
names => authHelper.filterByAuthorized(request.context,
DESCRIBE_CONFIGS, TOPIC,
names, logIfDenied = false)(identity))
future.handle[Unit] { (result, exception) =>
- requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota,
request, throttleTimeMs => {
if (exception != null) {
createTopicsRequest.getErrorResponse(throttleTimeMs, exception)
} else {
@@ -359,6 +364,12 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
+ private def controllerMutationQuotaRecorderFor(controllerMutationQuota:
ControllerMutationQuota) = {
+ new Consumer[lang.Integer]() {
+ override def accept(permits: lang.Integer): Unit =
controllerMutationQuota.record(permits.doubleValue())
+ }
+ }
+
def createTopics(
context: ControllerRequestContext,
request: CreateTopicsRequestData,
@@ -748,8 +759,10 @@ class ControllerApis(val requestChannel: RequestChannel,
authHelper.filterByAuthorized(request.context, ALTER, TOPIC, topics)(n
=> n)
}
val createPartitionsRequest = request.body[CreatePartitionsRequest]
+ val controllerMutationQuota =
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 3)
val context = new ControllerRequestContext(request.context.header.data,
request.context.principal,
- requestTimeoutMsToDeadlineNs(time,
createPartitionsRequest.data.timeoutMs))
+ requestTimeoutMsToDeadlineNs(time,
createPartitionsRequest.data.timeoutMs),
+ controllerMutationQuotaRecorderFor(controllerMutationQuota))
val future = createPartitions(context,
createPartitionsRequest.data(),
filterAlterAuthorizedTopics)
@@ -757,7 +770,7 @@ class ControllerApis(val requestChannel: RequestChannel,
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota,
request, requestThrottleMs => {
val responseData = new CreatePartitionsResponseData().
setResults(responses).
setThrottleTimeMs(requestThrottleMs)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 908e26a3020..b1a15d97741 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -24,8 +24,10 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp,
CreateTopicPolicyClassNameProp}
-import kafka.server.KafkaRaftServer.BrokerRole
import kafka.server.QuotaFactory.QuotaManagers
+
+import scala.collection.immutable
+import kafka.server.metadata.{ClientQuotaMetadataManager,
DynamicClientQuotaPublisher, DynamicConfigPublisher}
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{KafkaZkClient, ZkMigrationClient}
import org.apache.kafka.common.config.ConfigException
@@ -102,9 +104,11 @@ class ControllerServer(
var alterConfigPolicy: Option[AlterConfigPolicy] = None
var controller: Controller = _
var quotaManagers: QuotaManagers = _
+ var clientQuotaMetadataManager: ClientQuotaMetadataManager = _
var controllerApis: ControllerApis = _
var controllerApisHandlerPool: KafkaRequestHandlerPool = _
var migrationSupport: Option[ControllerMigrationSupport] = None
+ def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus):
Boolean = {
lock.lock()
@@ -118,13 +122,6 @@ class ControllerServer(
true
}
- private def doRemoteKraftSetup(): Unit = {
- // Explicitly configure metric reporters on this remote controller.
- // We do not yet support dynamic reconfiguration on remote controllers in
general;
- // remove this once that is implemented.
- new DynamicMetricReporterState(config.nodeId, config, metrics, clusterId)
- }
-
def clusterId: String = sharedServer.metaProps.clusterId
def startup(): Unit = {
@@ -243,11 +240,6 @@ class ControllerServer(
}
controller = controllerBuilder.build()
- // Perform any setup that is done only when this node is a
controller-only node.
- if (!config.processRoles.contains(BrokerRole)) {
- doRemoteKraftSetup()
- }
-
if (config.migrationEnabled) {
val zkClient = KafkaZkClient.createZkClient("KRaft Migration", time,
config, KafkaServer.zkClientConfigFromKafkaConfig(config))
val migrationClient = new ZkMigrationClient(zkClient)
@@ -272,6 +264,7 @@ class ControllerServer(
metrics,
time,
threadNamePrefix)
+ clientQuotaMetadataManager = new
ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
authorizer,
quotaManagers,
@@ -309,6 +302,26 @@ class ControllerServer(
FutureUtils.waitWithLogging(logger.underlying, "all of the SocketServer
Acceptors to be started",
socketServerFuture, startupDeadline, time)
+ // register this instance for dynamic config changes to the KafkaConfig
+ config.dynamicConfig.addReconfigurables(this)
+ // We must install the below publisher and receive the changes when we
are also running the broker role
+ // because we don't share a single KafkaConfig instance with the broker,
and therefore
+ // the broker's DynamicConfigPublisher won't take care of the changes
for us.
+ val dynamicConfigHandlers = immutable.Map[String, ConfigHandler](
+ // controllers don't host topics, so no need to do anything with
dynamic topic config changes here
+ ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
+ val dynamicConfigPublisher = new DynamicConfigPublisher(
+ config,
+ sharedServer.metadataPublishingFaultHandler,
+ dynamicConfigHandlers,
+ "controller")
+ val dynamicClientQuotaPublisher = new DynamicClientQuotaPublisher(
+ config,
+ sharedServer.metadataPublishingFaultHandler,
+ "controller",
+ clientQuotaMetadataManager)
+ FutureUtils.waitWithLogging(logger.underlying, "all of the dynamic
config and client quota publishers to be installed",
+ sharedServer.loader.installPublishers(List(dynamicConfigPublisher,
dynamicClientQuotaPublisher).asJava), startupDeadline, time)
} catch {
case e: Throwable =>
maybeChangeStatus(STARTING, STARTED)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index b924648c691..940580d155b 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -25,6 +25,7 @@ import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogManager}
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.server.DynamicBrokerConfig._
+import kafka.server.KafkaRaftServer.BrokerRole
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -262,15 +263,35 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
}
addReconfigurable(kafkaServer.kafkaYammerMetrics)
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId,
kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId))
- addReconfigurable(new DynamicClientQuotaCallback(kafkaServer))
+ addReconfigurable(new
DynamicClientQuotaCallback(kafkaServer.quotaManagers, kafkaServer.config))
- addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
+ addBrokerReconfigurable(new BrokerDynamicThreadPool(kafkaServer))
addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager,
kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
addBrokerReconfigurable(kafkaServer.socketServer)
addBrokerReconfigurable(new
DynamicProducerStateManagerConfig(kafkaServer.logManager.producerStateManagerConfig))
}
+ /**
+ * Add reconfigurables to be notified when a dynamic controller config is
updated.
+ */
+ def addReconfigurables(controller: ControllerServer): Unit = {
+ controller.authorizer match {
+ case Some(authz: Reconfigurable) => addReconfigurable(authz)
+ case _ =>
+ }
+ if (!kafkaConfig.processRoles.contains(BrokerRole)) {
+ // only add these if the controller isn't also running the broker role
+ // because these would already be added via the broker in that case
+ addReconfigurable(controller.kafkaYammerMetrics)
+ addReconfigurable(new DynamicMetricsReporters(kafkaConfig.nodeId,
controller.config, controller.metrics, controller.clusterId))
+ }
+ addReconfigurable(new DynamicClientQuotaCallback(controller.quotaManagers,
controller.config))
+ addBrokerReconfigurable(new ControllerDynamicThreadPool(controller))
+ // TODO: addBrokerReconfigurable(new DynamicListenerConfig(controller))
+ addBrokerReconfigurable(controller.socketServer)
+ }
+
def addReconfigurable(reconfigurable: Reconfigurable): Unit =
CoreUtils.inWriteLock(lock) {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
reconfigurables.add(reconfigurable)
@@ -704,19 +725,12 @@ object DynamicThreadPool {
KafkaConfig.NumReplicaFetchersProp,
KafkaConfig.NumRecoveryThreadsPerDataDirProp,
KafkaConfig.BackgroundThreadsProp)
-}
-class DynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {
-
- override def reconfigurableConfigs: Set[String] = {
- DynamicThreadPool.ReconfigurableConfigs
- }
-
- override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+ def validateReconfiguration(currentConfig: KafkaConfig, newConfig:
KafkaConfig): Unit = {
newConfig.values.forEach { (k, v) =>
- if (DynamicThreadPool.ReconfigurableConfigs.contains(k)) {
+ if (ReconfigurableConfigs.contains(k)) {
val newValue = v.asInstanceOf[Int]
- val oldValue = currentValue(k)
+ val oldValue = getValue(currentConfig, k)
if (newValue != oldValue) {
val errorMsg = s"Dynamic thread count update validation failed for
$k=$v"
if (newValue <= 0)
@@ -730,6 +744,43 @@ class DynamicThreadPool(server: KafkaBroker) extends
BrokerReconfigurable {
}
}
+ def getValue(config: KafkaConfig, name: String): Int = {
+ name match {
+ case KafkaConfig.NumIoThreadsProp => config.numIoThreads
+ case KafkaConfig.NumReplicaFetchersProp => config.numReplicaFetchers
+ case KafkaConfig.NumRecoveryThreadsPerDataDirProp =>
config.numRecoveryThreadsPerDataDir
+ case KafkaConfig.BackgroundThreadsProp => config.backgroundThreads
+ case n => throw new IllegalStateException(s"Unexpected config $n")
+ }
+ }
+}
+
+class ControllerDynamicThreadPool(controller: ControllerServer) extends
BrokerReconfigurable {
+
+ override def reconfigurableConfigs: Set[String] = {
+ Set(KafkaConfig.NumIoThreadsProp)
+ }
+
+ override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+ DynamicThreadPool.validateReconfiguration(controller.config, newConfig) //
common validation
+ }
+
+ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig):
Unit = {
+ if (newConfig.numIoThreads != oldConfig.numIoThreads)
+
controller.controllerApisHandlerPool.resizeThreadPool(newConfig.numIoThreads)
+ }
+}
+
+class BrokerDynamicThreadPool(server: KafkaBroker) extends
BrokerReconfigurable {
+
+ override def reconfigurableConfigs: Set[String] = {
+ DynamicThreadPool.ReconfigurableConfigs
+ }
+
+ override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+ DynamicThreadPool.validateReconfiguration(server.config, newConfig)
+ }
+
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig):
Unit = {
if (newConfig.numIoThreads != oldConfig.numIoThreads)
server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
@@ -740,16 +791,6 @@ class DynamicThreadPool(server: KafkaBroker) extends
BrokerReconfigurable {
if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
}
-
- private def currentValue(name: String): Int = {
- name match {
- case KafkaConfig.NumIoThreadsProp => server.config.numIoThreads
- case KafkaConfig.NumReplicaFetchersProp =>
server.config.numReplicaFetchers
- case KafkaConfig.NumRecoveryThreadsPerDataDirProp =>
server.config.numRecoveryThreadsPerDataDir
- case KafkaConfig.BackgroundThreadsProp => server.config.backgroundThreads
- case n => throw new IllegalStateException(s"Unexpected config $n")
- }
- }
}
class DynamicMetricsReporters(brokerId: Int, config: KafkaConfig, metrics:
Metrics, clusterId: String) extends Reconfigurable {
@@ -911,13 +952,16 @@ object DynamicListenerConfig {
)
}
-class DynamicClientQuotaCallback(server: KafkaBroker) extends Reconfigurable {
+class DynamicClientQuotaCallback(
+ quotaManagers: QuotaFactory.QuotaManagers,
+ serverConfig: KafkaConfig
+) extends Reconfigurable {
override def configure(configs: util.Map[String, _]): Unit = {}
override def reconfigurableConfigs(): util.Set[String] = {
val configs = new util.HashSet[String]()
- server.quotaManagers.clientQuotaCallback.foreach {
+ quotaManagers.clientQuotaCallback.foreach {
case callback: Reconfigurable =>
configs.addAll(callback.reconfigurableConfigs)
case _ =>
}
@@ -925,17 +969,16 @@ class DynamicClientQuotaCallback(server: KafkaBroker)
extends Reconfigurable {
}
override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
- server.quotaManagers.clientQuotaCallback.foreach {
+ quotaManagers.clientQuotaCallback.foreach {
case callback: Reconfigurable =>
callback.validateReconfiguration(configs)
case _ =>
}
}
override def reconfigure(configs: util.Map[String, _]): Unit = {
- val config = server.config
- server.quotaManagers.clientQuotaCallback.foreach {
+ quotaManagers.clientQuotaCallback.foreach {
case callback: Reconfigurable =>
- config.dynamicConfig.maybeReconfigure(callback,
config.dynamicConfig.currentKafkaConfig, configs)
+ serverConfig.dynamicConfig.maybeReconfigure(callback,
serverConfig.dynamicConfig.currentKafkaConfig, configs)
true
case _ => false
}
diff --git
a/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
b/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
new file mode 100644
index 00000000000..0533161b6d1
--- /dev/null
+++
b/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.server.KafkaConfig
+import kafka.utils.Logging
+import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.server.fault.FaultHandler
+
+
+class DynamicClientQuotaPublisher(
+ conf: KafkaConfig,
+ faultHandler: FaultHandler,
+ nodeType: String,
+ clientQuotaMetadataManager: ClientQuotaMetadataManager,
+) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
+ logIdent = s"[${name()}] "
+
+ def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
+ val deltaName = s"MetadataDelta up to
${newImage.highestOffsetAndEpoch().offset}"
+ try {
+ Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
+ clientQuotaMetadataManager.update(clientQuotasDelta)
+ }
+ } catch {
+ case t: Throwable => faultHandler.handleFault("Uncaught exception while
" +
+ s"publishing dynamic client quota changes from ${deltaName}", t)
+ }
+ }
+
+ /**
+ * Returns the name of this publisher.
+ *
+ * @return The publisher name.
+ */
+ override def name(): String = s"DynamicClientQuotaPublisher ${nodeType}
id=${conf.nodeId}"
+
+ /**
+ * Publish a new cluster metadata snapshot that we loaded.
+ *
+ * @param delta The delta between the previous state and the new one.
+ * @param newImage The complete new state.
+ * @param manifest The contents of what was published.
+ */
+ override def publishSnapshot(delta: MetadataDelta, newImage: MetadataImage,
manifest: SnapshotManifest): Unit = {
+ publish(delta, newImage)
+ }
+
+ /**
+ * Publish a change to the cluster metadata.
+ *
+ * @param delta The delta between the previous state and the new one.
+ * @param newImage The complete new state.
+ * @param manifest The contents of what was published.
+ */
+ override def publishLogDelta(delta: MetadataDelta, newImage: MetadataImage,
manifest: LogDeltaManifest): Unit = {
+ publish(delta, newImage)
+ }
+
+ /**
+ * Close this metadata publisher.
+ */
+ override def close(): Unit = {
+ // nothing to close
+ }
+}
diff --git
a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
index 12ff51d4039..a3581350137 100644
--- a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
@@ -22,6 +22,7 @@ import kafka.server.ConfigAdminManager.toLoggableProps
import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig}
import kafka.utils.Logging
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC}
+import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.fault.FaultHandler
@@ -30,9 +31,9 @@ class DynamicConfigPublisher(
conf: KafkaConfig,
faultHandler: FaultHandler,
dynamicConfigHandlers: Map[String, ConfigHandler],
- nodeType: String
-) extends Logging {
- logIdent = s"[DynamicConfigPublisher nodeType=${nodeType} id=${conf.nodeId}]
"
+ nodeType: String,
+) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
+ logIdent = s"[${name()}] "
def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
val deltaName = s"MetadataDelta up to
${newImage.highestOffsetAndEpoch().offset}"
@@ -100,4 +101,40 @@ class DynamicConfigPublisher(
def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
}
+
+ /**
+ * Returns the name of this publisher.
+ *
+ * @return The publisher name.
+ */
+ override def name(): String = s"DynamicConfigPublisher ${nodeType}
id=${conf.nodeId}"
+
+ /**
+ * Publish a new cluster metadata snapshot that we loaded.
+ *
+ * @param delta The delta between the previous state and the new one.
+ * @param newImage The complete new state.
+ * @param manifest The contents of what was published.
+ */
+ override def publishSnapshot(delta: MetadataDelta, newImage: MetadataImage,
manifest: SnapshotManifest): Unit = {
+ publish(delta, newImage)
+ }
+
+ /**
+ * Publish a change to the cluster metadata.
+ *
+ * @param delta The delta between the previous state and the new one.
+ * @param newImage The complete new state.
+ * @param manifest The contents of what was published.
+ */
+ override def publishLogDelta(delta: MetadataDelta, newImage: MetadataImage,
manifest: LogDeltaManifest): Unit = {
+ publish(delta, newImage)
+ }
+
+ /**
+ * Close this metadata publisher.
+ */
+ override def close(): Unit = {
+ // nothing to close
+ }
}
diff --git a/core/src/test/java/kafka/test/MockController.java
b/core/src/test/java/kafka/test/MockController.java
index bd9dd5a649a..289ef75a560 100644
--- a/core/src/test/java/kafka/test/MockController.java
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
@@ -104,6 +105,11 @@ public class MockController implements Controller {
return this;
}
+ public Builder newInitialTopic(String name, Uuid id, int
numPartitions) {
+ initialTopics.put(name, new MockTopic(name, id, numPartitions));
+ return this;
+ }
+
public MockController build() {
return new MockController(initialTopics.values());
}
@@ -149,30 +155,37 @@ public class MockController implements Controller {
} else {
long topicId = nextTopicId.getAndIncrement();
Uuid topicUuid = new Uuid(0, topicId);
- topicNameToId.put(topic.name(), topicUuid);
- topics.put(topicUuid, new MockTopic(topic.name(), topicUuid));
+ MockTopic mockTopic = new MockTopic(topic.name(), topicUuid);
CreatableTopicResult creatableTopicResult = new
CreatableTopicResult().
setName(topic.name()).
- setErrorCode(Errors.NONE.code()).
- setTopicId(topicUuid);
- if (describable.contains(topic.name())) {
- // Note: we don't simulate topic configs here yet.
- // Just returning replication factor and numPartitions.
- if (topic.assignments() != null &&
!topic.assignments().isEmpty()) {
- creatableTopicResult.
- setTopicConfigErrorCode(Errors.NONE.code()).
- setReplicationFactor((short)
-
topic.assignments().iterator().next().brokerIds().size()).
- setNumPartitions(topic.assignments().size());
+ setErrorCode(Errors.NONE.code());
+ try {
+ context.applyPartitionChangeQuota(mockTopic.numPartitions);
+ creatableTopicResult.setTopicId(topicUuid);
+ topicNameToId.put(topic.name(), topicUuid);
+ topics.put(topicUuid, mockTopic);
+ if (describable.contains(topic.name())) {
+ // Note: we don't simulate topic configs here yet.
+ // Just returning replication factor and numPartitions.
+ if (topic.assignments() != null &&
!topic.assignments().isEmpty()) {
+ creatableTopicResult.
+ setTopicConfigErrorCode(Errors.NONE.code()).
+ setReplicationFactor((short)
+
topic.assignments().iterator().next().brokerIds().size()).
+ setNumPartitions(topic.assignments().size());
+ } else {
+ creatableTopicResult.
+ setTopicConfigErrorCode(Errors.NONE.code()).
+
setReplicationFactor(topic.replicationFactor()).
+ setNumPartitions(topic.numPartitions());
+ }
} else {
creatableTopicResult.
- setTopicConfigErrorCode(Errors.NONE.code()).
- setReplicationFactor(topic.replicationFactor()).
- setNumPartitions(topic.numPartitions());
+
setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code());
}
- } else {
- creatableTopicResult.
-
setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code());
+ } catch (ThrottlingQuotaExceededException e) {
+ ApiError apiError = new
ApiError(Errors.THROTTLING_QUOTA_EXCEEDED);
+
creatableTopicResult.setErrorCode(apiError.error().code()).setErrorMessage(apiError.message());
}
response.topics().add(creatableTopicResult);
}
@@ -191,10 +204,16 @@ public class MockController implements Controller {
static class MockTopic {
private final String name;
private final Uuid id;
+ private final int numPartitions;
MockTopic(String name, Uuid id) {
+ this(name, id, 1);
+ }
+
+ MockTopic(String name, Uuid id, int numPartitions) {
this.name = name;
this.id = id;
+ this.numPartitions = numPartitions;
}
}
@@ -260,12 +279,18 @@ public class MockController implements Controller {
}
Map<Uuid, ApiError> results = new HashMap<>();
for (Uuid topicId : topicIds) {
- MockTopic topic = topics.remove(topicId);
+ MockTopic topic = topics.get(topicId);
if (topic == null) {
results.put(topicId, new ApiError(Errors.UNKNOWN_TOPIC_ID));
} else {
- topicNameToId.remove(topic.name);
- results.put(topicId, ApiError.NONE);
+ try {
+ context.applyPartitionChangeQuota(topic.numPartitions);
+ topics.remove(topicId);
+ topicNameToId.remove(topic.name);
+ results.put(topicId, ApiError.NONE);
+ } catch (ThrottlingQuotaExceededException e) {
+ results.put(topicId, new
ApiError(Errors.THROTTLING_QUOTA_EXCEEDED));
+ }
}
}
return CompletableFuture.completedFuture(results);
@@ -434,9 +459,18 @@ public class MockController implements Controller {
List<CreatePartitionsTopicResult> results = new ArrayList<>();
for (CreatePartitionsTopic topic : topicList) {
if (topicNameToId.containsKey(topic.name())) {
- results.add(new
CreatePartitionsTopicResult().setName(topic.name()).
- setErrorCode(Errors.NONE.code()).
- setErrorMessage(null));
+ try {
+ context.applyPartitionChangeQuota(topic.count());
+ results.add(new
CreatePartitionsTopicResult().setName(topic.name()).
+ setErrorCode(Errors.NONE.code()).
+ setErrorMessage(null));
+ } catch (ThrottlingQuotaExceededException e) {
+ ApiError apiError = new
ApiError(Errors.THROTTLING_QUOTA_EXCEEDED);
+ results.add(new CreatePartitionsTopicResult().
+ setName(topic.name()).
+ setErrorCode(apiError.error().code()).
+ setErrorMessage(apiError.message()));
+ }
} else {
results.add(new
CreatePartitionsTopicResult().setName(topic.name()).
setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 768b61e70d4..aee0c182ec2 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -31,14 +31,19 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest,
DescribeClusterResponse}
-import org.apache.kafka.common.{Endpoint, TopicPartition, TopicPartitionInfo}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.{Cluster, Endpoint, Reconfigurable,
TopicPartition, TopicPartitionInfo}
import org.apache.kafka.image.ClusterImage
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
+import org.apache.kafka.server.quota
+import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Tag, Test, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.slf4j.LoggerFactory
import java.io.File
@@ -581,10 +586,10 @@ class KRaftClusterTest {
assertEquals(Seq(ApiError.NONE), incrementalAlter(admin, Seq(
(new ConfigResource(Type.BROKER, ""), Seq(
new AlterConfigOp(new ConfigEntry("log.roll.ms", "1234567"),
OpType.SET),
- new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "6"),
OpType.SET))))))
+ new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "60"),
OpType.SET))))))
validateConfigs(admin, Map(new ConfigResource(Type.BROKER, "") -> Seq(
("log.roll.ms", "1234567"),
- ("max.connections.per.ip", "6"))), true)
+ ("max.connections.per.ip", "60"))), true)
admin.createTopics(Arrays.asList(
new NewTopic("foo", 2, 3.toShort),
@@ -967,6 +972,48 @@ class KRaftClusterTest {
cluster.close()
}
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true))
+ def testReconfigureControllerClientQuotas(combinedController: Boolean): Unit
= {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setCoResident(combinedController).
+ setNumControllerNodes(1).build()).
+ setConfigProp("client.quota.callback.class",
classOf[DummyClientQuotaCallback].getName).
+
setConfigProp(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey,
"0").
+ build()
+
+ def assertConfigValue(expected: Int): Unit = {
+ TestUtils.retry(60000) {
+ assertEquals(expected,
cluster.controllers().values().iterator().next().
+
quotaManagers.clientQuotaCallback.get.asInstanceOf[DummyClientQuotaCallback].value)
+ assertEquals(expected, cluster.brokers().values().iterator().next().
+
quotaManagers.clientQuotaCallback.get.asInstanceOf[DummyClientQuotaCallback].value)
+ }
+ }
+
+ try {
+ cluster.format()
+ cluster.startup()
+ cluster.waitForReadyBrokers()
+ assertConfigValue(0)
+ val admin = Admin.create(cluster.clientProperties())
+ try {
+ admin.incrementalAlterConfigs(
+ Collections.singletonMap(new ConfigResource(Type.BROKER, ""),
+ Collections.singletonList(new AlterConfigOp(
+ new
ConfigEntry(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey,
"1"), OpType.SET)))).
+ all().get()
+ } finally {
+ admin.close()
+ }
+ assertConfigValue(1)
+ } finally {
+ cluster.close()
+ }
+ }
}
class BadAuthorizer() extends Authorizer {
@@ -987,3 +1034,39 @@ class BadAuthorizer() extends Authorizer {
override def deleteAcls(requestContext: AuthorizableRequestContext,
aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <:
CompletionStage[AclDeleteResult]] = ???
}
+
+object DummyClientQuotaCallback {
+ val dummyClientQuotaCallbackValueConfigKey =
"dummy.client.quota.callback.value"
+}
+
+class DummyClientQuotaCallback() extends ClientQuotaCallback with
Reconfigurable {
+ var value = 0
+ override def quotaMetricTags(quotaType: ClientQuotaType, principal:
KafkaPrincipal, clientId: String): util.Map[String, String] =
Collections.emptyMap()
+
+ override def quotaLimit(quotaType: ClientQuotaType, metricTags:
util.Map[String, String]): lang.Double = 1.0
+
+ override def updateQuota(quotaType: ClientQuotaType, quotaEntity:
quota.ClientQuotaEntity, newValue: Double): Unit = {}
+
+ override def removeQuota(quotaType: ClientQuotaType, quotaEntity:
quota.ClientQuotaEntity): Unit = {}
+
+ override def quotaResetRequired(quotaType: ClientQuotaType): Boolean = true
+
+ override def updateClusterMetadata(cluster: Cluster): Boolean = false
+
+ override def close(): Unit = {}
+
+ override def configure(configs: util.Map[String, _]): Unit = {
+ val newValue =
configs.get(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey)
+ if (newValue != null) {
+ value = Integer.parseInt(newValue.toString)
+ }
+ }
+
+ override def reconfigurableConfigs(): util.Set[String] =
Set(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey).asJava
+
+ override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+ }
+
+ override def reconfigure(configs: util.Map[String, _]): Unit =
configure(configs)
+}
+
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index c5919282802..ffa56de63db 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -70,6 +70,27 @@ import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
class ControllerApisTest {
+ object MockControllerMutationQuota {
+ val errorMessage = "quota exceeded in test"
+ var throttleTimeMs = 1000
+ }
+
+ case class MockControllerMutationQuota(quota: Int) extends
ControllerMutationQuota {
+ var permitsRecorded = 0.0
+
+ override def isExceeded: Boolean = permitsRecorded > quota
+
+ override def record(permits: Double): Unit = {
+ if (permits >= 0) {
+ permitsRecorded += permits
+ if (isExceeded)
+ throw new ThrottlingQuotaExceededException(throttleTime,
MockControllerMutationQuota.errorMessage)
+ }
+ }
+
+ override def throttleTime: Int = if (isExceeded)
MockControllerMutationQuota.throttleTimeMs else 0
+ }
+
private val nodeId = 1
private val brokerRack = "Rack1"
private val clientID = "Client1"
@@ -78,15 +99,38 @@ class ControllerApisTest {
private val time = new MockTime
private val clientQuotaManager: ClientQuotaManager =
mock(classOf[ClientQuotaManager])
private val clientRequestQuotaManager: ClientRequestQuotaManager =
mock(classOf[ClientRequestQuotaManager])
- private val clientControllerQuotaManager: ControllerMutationQuotaManager =
mock(classOf[ControllerMutationQuotaManager])
+ private val neverThrottlingClientControllerQuotaManager:
ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager])
+ when(neverThrottlingClientControllerQuotaManager.newQuotaFor(
+ any(classOf[RequestChannel.Request]),
+ any(classOf[Short])
+ )).thenReturn(
+ MockControllerMutationQuota(Integer.MAX_VALUE) // never throttles
+ )
+ private val alwaysThrottlingClientControllerQuotaManager:
ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager])
+ when(alwaysThrottlingClientControllerQuotaManager.newQuotaFor(
+ any(classOf[RequestChannel.Request]),
+ any(classOf[Short])
+ )).thenReturn(
+ MockControllerMutationQuota(0) // always throttles
+ )
private val replicaQuotaManager: ReplicationQuotaManager =
mock(classOf[ReplicationQuotaManager])
private val raftManager: RaftManager[ApiMessageAndVersion] =
mock(classOf[RaftManager[ApiMessageAndVersion]])
- private val quotas = QuotaManagers(
+ private val quotasNeverThrottleControllerMutations = QuotaManagers(
+ clientQuotaManager,
+ clientQuotaManager,
+ clientRequestQuotaManager,
+ neverThrottlingClientControllerQuotaManager,
+ replicaQuotaManager,
+ replicaQuotaManager,
+ replicaQuotaManager,
+ None)
+
+ private val quotasAlwaysThrottleControllerMutations = QuotaManagers(
clientQuotaManager,
clientQuotaManager,
clientRequestQuotaManager,
- clientControllerQuotaManager,
+ alwaysThrottlingClientControllerQuotaManager,
replicaQuotaManager,
replicaQuotaManager,
replicaQuotaManager,
@@ -94,7 +138,8 @@ class ControllerApisTest {
private def createControllerApis(authorizer: Option[Authorizer],
controller: Controller,
- props: Properties = new Properties()):
ControllerApis = {
+ props: Properties = new Properties(),
+ throttle: Boolean = false): ControllerApis
= {
props.put(KafkaConfig.NodeIdProp, nodeId: java.lang.Integer)
props.put(KafkaConfig.ProcessRolesProp, "controller")
props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
@@ -102,7 +147,7 @@ class ControllerApisTest {
new ControllerApis(
requestChannel,
authorizer,
- quotas,
+ if (throttle) quotasAlwaysThrottleControllerMutations else
quotasNeverThrottleControllerMutations,
time,
controller,
raftManager,
@@ -562,6 +607,34 @@ class ControllerApisTest {
_ => Set("baz")).get().topics().asScala.toSet)
}
+ @ParameterizedTest(name = "testCreateTopicsMutationQuota with throttle: {0}")
+ @ValueSource(booleans = Array(true, false))
+ def testCreateTopicsMutationQuota(throttle: Boolean): Unit = {
+ val controller = new MockController.Builder().build()
+ val controllerApis = createControllerApis(None, controller, new
Properties(), throttle)
+ val topicName = "foo"
+ val requestData = new CreateTopicsRequestData().setTopics(new
CreatableTopicCollection(
+ util.Collections.singletonList(new
CreatableTopic().setName(topicName).setNumPartitions(1).setReplicationFactor(1)).iterator()))
+ val request = new CreateTopicsRequest.Builder(requestData).build()
+ val expectedResponseDataUnthrottled = Set(new
CreatableTopicResult().setName(topicName).
+ setErrorCode(NONE.code()).
+ setTopicId(new Uuid(0L, 1L)).
+ setNumPartitions(1).
+ setReplicationFactor(1).
+ setTopicConfigErrorCode(NONE.code()))
+ val expectedResponseDataThrottled = Set(new
CreatableTopicResult().setName(topicName).
+ setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+ setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
+ val response = handleRequest[CreateTopicsResponse](request, controllerApis)
+ if (throttle) {
+ assertEquals(expectedResponseDataThrottled,
response.data.topics().asScala.toSet)
+ assertEquals(MockControllerMutationQuota.throttleTimeMs,
response.throttleTimeMs())
+ } else {
+ assertEquals(expectedResponseDataUnthrottled,
response.data.topics().asScala.toSet)
+ assertEquals(0, response.throttleTimeMs())
+ }
+ }
+
@Test
def testDeleteTopicsByName(): Unit = {
val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
@@ -831,6 +904,32 @@ class ControllerApisTest {
assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), results.find(_.name
== "bar").map(result => Errors.forCode(result.errorCode)))
}
+ @ParameterizedTest(name = "testCreatePartitionsMutationQuota with throttle:
{0}")
+ @ValueSource(booleans = Array(true, false))
+ def testCreatePartitionsMutationQuota(throttle: Boolean): Unit = {
+ val topicName = "foo"
+ val controller = new MockController.Builder()
+ .newInitialTopic(topicName, Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"), 1)
+ .build()
+ val controllerApis = createControllerApis(None, controller, new
Properties(), throttle)
+ val requestData = new CreatePartitionsRequestData()
+ requestData.topics().add(new
CreatePartitionsTopic().setName(topicName).setAssignments(null).setCount(2))
+ val request = new CreatePartitionsRequest.Builder(requestData).build()
+ val expectedResponseDataUnthrottled = Set(new
CreatePartitionsTopicResult().setName(topicName).
+ setErrorCode(NONE.code()))
+ val expectedResponseDataThrottled = Set(new
CreatePartitionsTopicResult().setName(topicName).
+ setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+ setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
+ val response = handleRequest[CreatePartitionsResponse](request,
controllerApis)
+ if (throttle) {
+ assertEquals(expectedResponseDataThrottled,
response.data.results().asScala.toSet)
+ assertEquals(MockControllerMutationQuota.throttleTimeMs,
response.throttleTimeMs())
+ } else {
+ assertEquals(expectedResponseDataUnthrottled,
response.data.results().asScala.toSet)
+ assertEquals(0, response.throttleTimeMs())
+ }
+ }
+
@Test
def testElectLeadersAuthorization(): Unit = {
val authorizer = mock(classOf[Authorizer])
@@ -918,6 +1017,35 @@ class ControllerApisTest {
assertEquals(Errors.NOT_CONTROLLER,
Errors.forCode(topicIdResponse.errorCode))
}
+ @ParameterizedTest(name = "testDeleteTopicsMutationQuota with throttle: {0}")
+ @ValueSource(booleans = Array(true, false))
+ def testDeleteTopicsMutationQuota(throttle: Boolean): Unit = {
+ val topicId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+ val topicName = "foo"
+ val controller = new MockController.Builder()
+ .newInitialTopic(topicName, topicId, 1)
+ .build()
+ val controllerApis = createControllerApis(None, controller, new
Properties(), throttle)
+ val requestData = new DeleteTopicsRequestData().setTopics(singletonList(
+ new DeleteTopicState().setTopicId(topicId)))
+ val request = new DeleteTopicsRequest.Builder(requestData).build()
+ val expectedResponseDataUnthrottled = Set(new
DeletableTopicResult().setName(topicName).
+ setTopicId(topicId).
+ setErrorCode(NONE.code()))
+ val expectedResponseDataThrottled = Set(new
DeletableTopicResult().setName(topicName).
+ setTopicId(topicId).
+ setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+ setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
+ val response = handleRequest[DeleteTopicsResponse](request, controllerApis)
+ if (throttle) {
+ assertEquals(expectedResponseDataThrottled,
response.data.responses().asScala.toSet)
+ assertEquals(MockControllerMutationQuota.throttleTimeMs,
response.throttleTimeMs())
+ } else {
+ assertEquals(expectedResponseDataUnthrottled,
response.data.responses().asScala.toSet)
+ assertEquals(0, response.throttleTimeMs())
+ }
+ }
+
@Test
def testAllocateProducerIdsReturnsNotController(): Unit = {
val controller = mock(classOf[Controller])
@@ -1007,6 +1135,7 @@ class ControllerApisTest {
@AfterEach
def tearDown(): Unit = {
- quotas.shutdown()
+ quotasNeverThrottleControllerMutations.shutdown()
+ quotasAlwaysThrottleControllerMutations.shutdown()
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 861ab321818..76617426f97 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -33,8 +33,9 @@ import org.apache.kafka.common.config.{ConfigException,
SslConfigs}
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.server.authorizer._
+import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.KafkaScheduler
-import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.storage.internals.log.{LogConfig,
ProducerStateManagerConfig}
import org.apache.kafka.test.MockMetricsReporter
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -153,7 +154,7 @@ class DynamicBrokerConfigTest {
Mockito.when(serverMock.kafkaScheduler).thenReturn(schedulerMock)
config.dynamicConfig.initialize(None)
- config.dynamicConfig.addBrokerReconfigurable(new
DynamicThreadPool(serverMock))
+ config.dynamicConfig.addBrokerReconfigurable(new
BrokerDynamicThreadPool(serverMock))
config.dynamicConfig.addReconfigurable(acceptorMock)
val props = new Properties()
@@ -445,6 +446,32 @@ class DynamicBrokerConfigTest {
assertThrows(classOf[ConfigException], () =>
dynamicListenerConfig.validateReconfiguration(KafkaConfig(props)))
}
+ class TestAuthorizer extends Authorizer with Reconfigurable {
+ @volatile var superUsers = ""
+
+ override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _
<: CompletionStage[Void]] = Map.empty.asJava
+
+ override def authorize(requestContext: AuthorizableRequestContext,
actions: util.List[Action]): util.List[AuthorizationResult] = null
+
+ override def createAcls(requestContext: AuthorizableRequestContext,
aclBindings: util.List[AclBinding]): util.List[_ <:
CompletionStage[AclCreateResult]] = null
+
+ override def deleteAcls(requestContext: AuthorizableRequestContext,
aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <:
CompletionStage[AclDeleteResult]] = null
+
+ override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] =
null
+
+ override def close(): Unit = {}
+
+ override def configure(configs: util.Map[String, _]): Unit = {}
+
+ override def reconfigurableConfigs(): util.Set[String] =
Set("super.users").asJava
+
+ override def validateReconfiguration(configs: util.Map[String, _]): Unit =
{}
+
+ override def reconfigure(configs: util.Map[String, _]): Unit = {
+ superUsers = configs.get("super.users").toString
+ }
+ }
+
@Test
def testAuthorizerConfig(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 9092)
@@ -452,33 +479,119 @@ class DynamicBrokerConfigTest {
oldConfig.dynamicConfig.initialize(None)
val kafkaServer: KafkaServer = mock(classOf[kafka.server.KafkaServer])
-
- class TestAuthorizer extends Authorizer with Reconfigurable {
- @volatile var superUsers = ""
- override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint,
_ <: CompletionStage[Void]] = Map.empty.asJava
- override def authorize(requestContext: AuthorizableRequestContext,
actions: util.List[Action]): util.List[AuthorizationResult] = null
- override def createAcls(requestContext: AuthorizableRequestContext,
aclBindings: util.List[AclBinding]): util.List[_ <:
CompletionStage[AclCreateResult]] = null
- override def deleteAcls(requestContext: AuthorizableRequestContext,
aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <:
CompletionStage[AclDeleteResult]] = null
- override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] =
null
- override def close(): Unit = {}
- override def configure(configs: util.Map[String, _]): Unit = {}
- override def reconfigurableConfigs(): util.Set[String] =
Set("super.users").asJava
- override def validateReconfiguration(configs: util.Map[String, _]): Unit
= {}
- override def reconfigure(configs: util.Map[String, _]): Unit = {
- superUsers = configs.get("super.users").toString
- }
- }
+ when(kafkaServer.config).thenReturn(oldConfig)
+
when(kafkaServer.kafkaYammerMetrics).thenReturn(KafkaYammerMetrics.INSTANCE)
+ val metrics: Metrics = mock(classOf[Metrics])
+ when(kafkaServer.metrics).thenReturn(metrics)
+ val quotaManagers: QuotaFactory.QuotaManagers =
mock(classOf[QuotaFactory.QuotaManagers])
+ when(quotaManagers.clientQuotaCallback).thenReturn(None)
+ when(kafkaServer.quotaManagers).thenReturn(quotaManagers)
+ val socketServer: SocketServer = mock(classOf[SocketServer])
+
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
+ when(kafkaServer.socketServer).thenReturn(socketServer)
+ val logManager: LogManager = mock(classOf[LogManager])
+ val producerStateManagerConfig: ProducerStateManagerConfig =
mock(classOf[ProducerStateManagerConfig])
+
when(logManager.producerStateManagerConfig).thenReturn(producerStateManagerConfig)
+ when(kafkaServer.logManager).thenReturn(logManager)
val authorizer = new TestAuthorizer
- when(kafkaServer.config).thenReturn(oldConfig)
when(kafkaServer.authorizer).thenReturn(Some(authorizer))
- // We are only testing authorizer reconfiguration, ignore any exceptions
due to incomplete mock
- assertThrows(classOf[Throwable], () =>
kafkaServer.config.dynamicConfig.addReconfigurables(kafkaServer))
+
+ kafkaServer.config.dynamicConfig.addReconfigurables(kafkaServer)
props.put("super.users", "User:admin")
kafkaServer.config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals("User:admin", authorizer.superUsers)
}
+ private def createCombinedControllerConfig(
+ nodeId: Int,
+ port: Int
+ ): Properties = {
+ val retval = TestUtils.createBrokerConfig(nodeId,
+ zkConnect = null,
+ enableControlledShutdown = true,
+ enableDeleteTopic = true,
+ port)
+ retval.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+ retval.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ retval.put(KafkaConfig.ListenersProp,
s"${retval.get(KafkaConfig.ListenersProp)},CONTROLLER://localhost:0")
+ retval.put(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:0")
+ retval
+ }
+
+ @Test
+ def testCombinedControllerAuthorizerConfig(): Unit = {
+ val props = createCombinedControllerConfig(0, 9092)
+ val oldConfig = KafkaConfig.fromProps(props)
+ oldConfig.dynamicConfig.initialize(None)
+
+ val controllerServer: ControllerServer =
mock(classOf[kafka.server.ControllerServer])
+ when(controllerServer.config).thenReturn(oldConfig)
+
when(controllerServer.kafkaYammerMetrics).thenReturn(KafkaYammerMetrics.INSTANCE)
+ val metrics: Metrics = mock(classOf[Metrics])
+ when(controllerServer.metrics).thenReturn(metrics)
+ val quotaManagers: QuotaFactory.QuotaManagers =
mock(classOf[QuotaFactory.QuotaManagers])
+ when(quotaManagers.clientQuotaCallback).thenReturn(None)
+ when(controllerServer.quotaManagers).thenReturn(quotaManagers)
+ val socketServer: SocketServer = mock(classOf[SocketServer])
+
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
+ when(controllerServer.socketServer).thenReturn(socketServer)
+
+ val authorizer = new TestAuthorizer
+ when(controllerServer.authorizer).thenReturn(Some(authorizer))
+
+ controllerServer.config.dynamicConfig.addReconfigurables(controllerServer)
+ props.put("super.users", "User:admin")
+ controllerServer.config.dynamicConfig.updateBrokerConfig(0, props)
+ assertEquals("User:admin", authorizer.superUsers)
+ }
+
+ private def createIsolatedControllerConfig(
+ nodeId: Int,
+ port: Int
+ ): Properties = {
+ val retval = TestUtils.createBrokerConfig(nodeId,
+ zkConnect = null,
+ enableControlledShutdown = true,
+ enableDeleteTopic = true,
+ port
+ )
+ retval.put(KafkaConfig.ProcessRolesProp, "controller")
+ retval.remove(KafkaConfig.AdvertisedListenersProp)
+
+ retval.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ retval.put(KafkaConfig.ListenersProp, "CONTROLLER://localhost:0")
+ retval.put(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:0")
+ retval
+ }
+
+ @Test
+ def testIsolatedControllerAuthorizerConfig(): Unit = {
+ val props = createIsolatedControllerConfig(0, port = 9092)
+ val oldConfig = KafkaConfig.fromProps(props)
+ oldConfig.dynamicConfig.initialize(None)
+
+ val controllerServer: ControllerServer =
mock(classOf[kafka.server.ControllerServer])
+ when(controllerServer.config).thenReturn(oldConfig)
+
when(controllerServer.kafkaYammerMetrics).thenReturn(KafkaYammerMetrics.INSTANCE)
+ val metrics: Metrics = mock(classOf[Metrics])
+ when(controllerServer.metrics).thenReturn(metrics)
+ val quotaManagers: QuotaFactory.QuotaManagers =
mock(classOf[QuotaFactory.QuotaManagers])
+ when(quotaManagers.clientQuotaCallback).thenReturn(None)
+ when(controllerServer.quotaManagers).thenReturn(quotaManagers)
+ val socketServer: SocketServer = mock(classOf[SocketServer])
+
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
+ when(controllerServer.socketServer).thenReturn(socketServer)
+
+ val authorizer = new TestAuthorizer
+ when(controllerServer.authorizer).thenReturn(Some(authorizer))
+
+ controllerServer.config.dynamicConfig.addReconfigurables(controllerServer)
+ props.put("super.users", "User:admin")
+ controllerServer.config.dynamicConfig.updateBrokerConfig(0, props)
+ assertEquals("User:admin", authorizer.superUsers)
+ }
+
@Test
def testSynonyms(): Unit = {
assertEquals(List("listener.name.secure.ssl.keystore.type",
"ssl.keystore.type"),
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
b/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
index e4bc2f3eb4a..5585f172646 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
@@ -17,13 +17,14 @@
package org.apache.kafka.controller;
-
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import java.util.OptionalLong;
+import java.util.function.Consumer;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -42,19 +43,39 @@ public class ControllerRequestContext {
private final OptionalLong deadlineNs;
private final RequestHeaderData requestHeader;
+ private final Consumer<Integer> partitionChangeQuotaApplier;
+
public ControllerRequestContext(
RequestHeaderData requestHeader,
KafkaPrincipal principal,
OptionalLong deadlineNs
+ ) {
+ this(requestHeader, principal, deadlineNs, __ -> { });
+ }
+
+ public ControllerRequestContext(
+ RequestHeaderData requestHeader,
+ KafkaPrincipal principal,
+ OptionalLong deadlineNs,
+ Consumer<Integer> partitionChangeQuotaApplier
) {
this.requestHeader = requestHeader;
this.principal = principal;
this.deadlineNs = deadlineNs;
+ this.partitionChangeQuotaApplier = partitionChangeQuotaApplier;
}
public ControllerRequestContext(
AuthorizableRequestContext requestContext,
OptionalLong deadlineNs
+ ) {
+ this(requestContext, deadlineNs, __ -> { });
+ }
+
+ public ControllerRequestContext(
+ AuthorizableRequestContext requestContext,
+ OptionalLong deadlineNs,
+ Consumer<Integer> partitionChangeQuotaApplier
) {
this(
new RequestHeaderData()
@@ -63,7 +84,8 @@ public class ControllerRequestContext {
.setCorrelationId(requestContext.correlationId())
.setClientId(requestContext.clientId()),
requestContext.principal(),
- deadlineNs
+ deadlineNs,
+ partitionChangeQuotaApplier
);
}
@@ -78,4 +100,15 @@ public class ControllerRequestContext {
public OptionalLong deadlineNs() {
return deadlineNs;
}
+
+ /**
+ * Apply the partition change quota.
+ *
+ * @param requestedPartitionCount The value to apply.
+ * @throws ThrottlingQuotaExceededException If recording this value moves
a metric beyond its configured
+ * maximum or minimum bound
+ */
+ public void applyPartitionChangeQuota(int requestedPartitionCount) {
+ partitionChangeQuotaApplier.accept(requestedPartitionCount);
+ }
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index ff11211e343..71a1dbc4f72 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1802,7 +1802,7 @@ public final class QuorumController implements Controller
{
return CompletableFuture.completedFuture(new
CreateTopicsResponseData());
}
return appendWriteEvent("createTopics", context.deadlineNs(),
- () -> replicationControl.createTopics(request, describable));
+ () -> replicationControl.createTopics(context, request,
describable));
}
@Override
@@ -1852,7 +1852,7 @@ public final class QuorumController implements Controller
{
if (ids.isEmpty())
return CompletableFuture.completedFuture(Collections.emptyMap());
return appendWriteEvent("deleteTopics", context.deadlineNs(),
- () -> replicationControl.deleteTopics(ids));
+ () -> replicationControl.deleteTopics(context, ids));
}
@Override
@@ -2073,7 +2073,7 @@ public final class QuorumController implements Controller
{
}
return appendWriteEvent("createPartitions", context.deadlineNs(), ()
-> {
- final ControllerResult<List<CreatePartitionsTopicResult>> result =
replicationControl.createPartitions(topics);
+ final ControllerResult<List<CreatePartitionsTopicResult>> result =
replicationControl.createPartitions(context, topics);
if (validateOnly) {
log.debug("Validate-only CreatePartitions result(s): {}",
result.response());
return result.withoutRecords();
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 23113e64559..2b21d5c0b22 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.NoReassignmentInProgressException;
import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -522,8 +523,11 @@ public class ReplicationControlManager {
log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
}
- ControllerResult<CreateTopicsResponseData>
- createTopics(CreateTopicsRequestData request, Set<String>
describable) {
+ ControllerResult<CreateTopicsResponseData> createTopics(
+ ControllerRequestContext context,
+ CreateTopicsRequestData request,
+ Set<String> describable
+ ) {
Map<String, ApiError> topicErrors = new HashMap<>();
List<ApiMessageAndVersion> records = new ArrayList<>();
@@ -562,7 +566,7 @@ public class ReplicationControlManager {
}
ApiError error;
try {
- error = createTopic(topic, records, successes, configRecords,
describable.contains(topic.name()));
+ error = createTopic(context, topic, records, successes,
configRecords, describable.contains(topic.name()));
} catch (ApiException e) {
error = ApiError.fromThrowable(e);
}
@@ -602,7 +606,8 @@ public class ReplicationControlManager {
}
}
- private ApiError createTopic(CreatableTopic topic,
+ private ApiError createTopic(ControllerRequestContext context,
+ CreatableTopic topic,
List<ApiMessageAndVersion> records,
Map<String, CreatableTopicResult> successes,
List<ApiMessageAndVersion> configRecords,
@@ -704,6 +709,14 @@ public class ReplicationControlManager {
topic.name(), numPartitions, replicationFactor, null,
creationConfigs));
if (error.isFailure()) return error;
}
+ int numPartitions = newParts.size();
+ try {
+ context.applyPartitionChangeQuota(numPartitions); // check
controller mutation quota
+ } catch (ThrottlingQuotaExceededException e) {
+ log.debug("Topic creation of {} partitions not allowed because
quota is violated. Delay time: {}",
+ numPartitions, e.throttleTimeMs());
+ return ApiError.fromThrowable(e);
+ }
Uuid topicId = Uuid.randomUuid();
CreatableTopicResult result = new CreatableTopicResult().
setName(topic.name()).
@@ -724,7 +737,7 @@ public class ReplicationControlManager {
setConfigSource(KafkaConfigSchema.translateConfigSource(entry.source()).id()).
setIsSensitive(entry.isSensitive()));
}
- result.setNumPartitions(newParts.size());
+ result.setNumPartitions(numPartitions);
result.setReplicationFactor((short)
newParts.values().iterator().next().replicas.length);
result.setTopicConfigErrorCode(NONE.code());
} else {
@@ -847,12 +860,12 @@ public class ReplicationControlManager {
return results;
}
- ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+ ControllerResult<Map<Uuid, ApiError>>
deleteTopics(ControllerRequestContext context, Collection<Uuid> ids) {
Map<Uuid, ApiError> results = new HashMap<>(ids.size());
List<ApiMessageAndVersion> records = new ArrayList<>(ids.size());
for (Uuid id : ids) {
try {
- deleteTopic(id, records);
+ deleteTopic(context, id, records);
results.put(id, ApiError.NONE);
} catch (ApiException e) {
results.put(id, ApiError.fromThrowable(e));
@@ -864,11 +877,20 @@ public class ReplicationControlManager {
return ControllerResult.atomicOf(records, results);
}
- void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {
+ void deleteTopic(ControllerRequestContext context, Uuid id,
List<ApiMessageAndVersion> records) {
TopicControlInfo topic = topics.get(id);
if (topic == null) {
throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message());
}
+ int numPartitions = topic.parts.size();
+ try {
+ context.applyPartitionChangeQuota(numPartitions); // check
controller mutation quota
+ } catch (ThrottlingQuotaExceededException e) {
+ // log a message and rethrow the exception
+ log.debug("Topic deletion of {} partitions not allowed because
quota is violated. Delay time: {}",
+ numPartitions, e.throttleTimeMs());
+ throw e;
+ }
records.add(new ApiMessageAndVersion(new RemoveTopicRecord().
setTopicId(id), (short) 0));
}
@@ -1457,14 +1479,16 @@ public class ReplicationControlManager {
return ControllerResult.of(records, rescheduleImmidiately);
}
- ControllerResult<List<CreatePartitionsTopicResult>>
- createPartitions(List<CreatePartitionsTopic> topics) {
+ ControllerResult<List<CreatePartitionsTopicResult>> createPartitions(
+ ControllerRequestContext context,
+ List<CreatePartitionsTopic> topics
+ ) {
List<ApiMessageAndVersion> records = new ArrayList<>();
List<CreatePartitionsTopicResult> results = new ArrayList<>();
for (CreatePartitionsTopic topic : topics) {
ApiError apiError = ApiError.NONE;
try {
- createPartitions(topic, records);
+ createPartitions(context, topic, records);
} catch (ApiException e) {
apiError = ApiError.fromThrowable(e);
} catch (Exception e) {
@@ -1479,7 +1503,8 @@ public class ReplicationControlManager {
return ControllerResult.atomicOf(records, results);
}
- void createPartitions(CreatePartitionsTopic topic,
+ void createPartitions(ControllerRequestContext context,
+ CreatePartitionsTopic topic,
List<ApiMessageAndVersion> records) {
Uuid topicId = topicsByName.get(topic.name());
if (topicId == null) {
@@ -1505,6 +1530,14 @@ public class ReplicationControlManager {
" assignment(s) were specified.");
}
}
+ try {
+ context.applyPartitionChangeQuota(additional); // check controller
mutation quota
+ } catch (ThrottlingQuotaExceededException e) {
+ // log a message and rethrow the exception
+ log.debug("Partition creation of {} partitions not allowed because
quota is violated. Delay time: {}",
+ additional, e.throttleTimeMs());
+ throw e;
+ }
Iterator<PartitionRegistration> iterator =
topicInfo.parts.values().iterator();
if (!iterator.hasNext()) {
throw new UnknownServerException("Invalid state: topic " +
topic.name() +
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ControllerRequestContextUtil.java
b/metadata/src/test/java/org/apache/kafka/controller/ControllerRequestContextUtil.java
index 8d70a2d82f5..21153ae7a98 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ControllerRequestContextUtil.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ControllerRequestContextUtil.java
@@ -18,6 +18,9 @@
package org.apache.kafka.controller;
import java.util.OptionalLong;
+import java.util.function.Consumer;
+
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -28,21 +31,37 @@ public class ControllerRequestContextUtil {
new RequestHeaderData(),
KafkaPrincipal.ANONYMOUS,
OptionalLong.empty());
+ public static final String QUOTA_EXCEEDED_IN_TEST_MSG = "Quota exceeded in
test";
public static ControllerRequestContext anonymousContextFor(ApiKeys
apiKeys) {
- return anonymousContextFor(apiKeys, apiKeys.latestVersion());
+ return anonymousContextFor(apiKeys, apiKeys.latestVersion(), __ -> {
});
+ }
+
+ public static ControllerRequestContext
anonymousContextWithMutationQuotaExceededFor(ApiKeys apiKeys) {
+ return anonymousContextFor(apiKeys, apiKeys.latestVersion(), x -> {
+ throw new
ThrottlingQuotaExceededException(QUOTA_EXCEEDED_IN_TEST_MSG);
+ });
}
public static ControllerRequestContext anonymousContextFor(
ApiKeys apiKeys,
short version
+ ) {
+ return anonymousContextFor(apiKeys, version, __ -> { });
+ }
+
+ public static ControllerRequestContext anonymousContextFor(
+ ApiKeys apiKeys,
+ short version,
+ Consumer<Integer> partitionChangeQuotaApplier
) {
return new ControllerRequestContext(
new RequestHeaderData()
.setRequestApiKey(apiKeys.id)
.setRequestApiVersion(version),
KafkaPrincipal.ANONYMOUS,
- OptionalLong.empty()
+ OptionalLong.empty(),
+ partitionChangeQuotaApplier
);
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 76f8ffb4548..975062fbde7 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -128,9 +128,12 @@ import static
org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRES
import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION;
import static
org.apache.kafka.common.protocol.Errors.PREFERRED_LEADER_NOT_AVAILABLE;
+import static
org.apache.kafka.common.protocol.Errors.THROTTLING_QUOTA_EXCEEDED;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static
org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
+import static
org.apache.kafka.controller.ControllerRequestContextUtil.QUOTA_EXCEEDED_IN_TEST_MSG;
import static
org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor;
+import static
org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -219,8 +222,9 @@ public class ReplicationControlManagerTest {
CreatableTopic topic = new CreatableTopic().setName(name);
topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor);
request.topics().add(topic);
+ ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- replicationControl.createTopics(request,
Collections.singleton(name));
+ replicationControl.createTopics(requestContext, request,
Collections.singleton(name));
CreatableTopicResult topicResult =
result.response().topics().find(name);
assertNotNull(topicResult);
assertEquals(expectedErrorCode, topicResult.errorCode());
@@ -254,8 +258,9 @@ public class ReplicationControlManagerTest {
new
CreateTopicsRequestData.CreateableTopicConfig().setName(e.getKey()).
setValue(e.getValue())));
request.topics().add(topic);
+ ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- replicationControl.createTopics(request,
Collections.singleton(name));
+ replicationControl.createTopics(requestContext, request,
Collections.singleton(name));
CreatableTopicResult topicResult =
result.response().topics().find(name);
assertNotNull(topicResult);
assertEquals(expectedErrorCode, topicResult.errorCode());
@@ -267,8 +272,8 @@ public class ReplicationControlManagerTest {
return topicResult;
}
- void deleteTopic(Uuid topicId) throws Exception {
- ControllerResult<Map<Uuid, ApiError>> result =
replicationControl.deleteTopics(Collections.singleton(topicId));
+ void deleteTopic(ControllerRequestContext context, Uuid topicId)
throws Exception {
+ ControllerResult<Map<Uuid, ApiError>> result =
replicationControl.deleteTopics(context, Collections.singleton(topicId));
assertEquals(Collections.singleton(topicId),
result.response().keySet());
assertEquals(NONE, result.response().get(topicId).error());
assertEquals(1, result.records().size());
@@ -292,8 +297,9 @@ public class ReplicationControlManagerTest {
topic.assignments().add(new CreatePartitionsAssignment().
setBrokerIds(Replicas.toList(replicas[i])));
}
+ ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
ControllerResult<List<CreatePartitionsTopicResult>> result =
-
replicationControl.createPartitions(Collections.singletonList(topic));
+ replicationControl.createPartitions(requestContext,
Collections.singletonList(topic));
assertEquals(1, result.response().size());
CreatePartitionsTopicResult topicResult = result.response().get(0);
assertEquals(name, topicResult.name());
@@ -471,8 +477,9 @@ public class ReplicationControlManagerTest {
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(-1).setReplicationFactor((short) -1));
+ ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- replicationControl.createTopics(request,
Collections.singleton("foo"));
+ replicationControl.createTopics(requestContext, request,
Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
expectedResponse.topics().add(new
CreatableTopicResult().setName("foo").
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
@@ -485,7 +492,7 @@ public class ReplicationControlManagerTest {
ctx.inControlledShutdownBrokers(0);
ControllerResult<CreateTopicsResponseData> result2 =
- replicationControl.createTopics(request,
Collections.singleton("foo"));
+ replicationControl.createTopics(requestContext, request,
Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse2 = new
CreateTopicsResponseData();
expectedResponse2.topics().add(new
CreatableTopicResult().setName("foo").
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
@@ -497,7 +504,7 @@ public class ReplicationControlManagerTest {
ctx.unfenceBrokers(0, 1, 2);
ControllerResult<CreateTopicsResponseData> result3 =
- replicationControl.createTopics(request,
Collections.singleton("foo"));
+ replicationControl.createTopics(requestContext, request,
Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse3 = new
CreateTopicsResponseData();
expectedResponse3.topics().add(new
CreatableTopicResult().setName("foo").
setNumPartitions(1).setReplicationFactor((short) 3).
@@ -510,7 +517,7 @@ public class ReplicationControlManagerTest {
replicationControl.getPartition(
((TopicRecord) result3.records().get(0).message()).topicId(),
0));
ControllerResult<CreateTopicsResponseData> result4 =
- replicationControl.createTopics(request,
Collections.singleton("foo"));
+ replicationControl.createTopics(requestContext, request,
Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse4 = new
CreateTopicsResponseData();
expectedResponse4.topics().add(new
CreatableTopicResult().setName("foo").
setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).
@@ -518,6 +525,26 @@ public class ReplicationControlManagerTest {
assertEquals(expectedResponse4, result4.response());
}
+ @Test
+ public void testCreateTopicsWithMutationQuotaExceeded() throws Exception {
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ request.topics().add(new CreatableTopic().setName("foo").
+ setNumPartitions(-1).setReplicationFactor((short) -1));
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+ ControllerRequestContext requestContext =
+
anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS);
+ ControllerResult<CreateTopicsResponseData> result =
+ replicationControl.createTopics(requestContext, request,
Collections.singleton("foo"));
+ CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
+ expectedResponse.topics().add(new
CreatableTopicResult().setName("foo").
+ setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+ setErrorMessage(QUOTA_EXCEEDED_IN_TEST_MSG));
+ assertEquals(expectedResponse, result.response());
+ }
+
@Test
public void testCreateTopicsISRInvariants() throws Exception {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
@@ -531,8 +558,9 @@ public class ReplicationControlManagerTest {
ctx.unfenceBrokers(0, 1);
ctx.inControlledShutdownBrokers(1);
+ ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- replicationControl.createTopics(request,
Collections.singleton("foo"));
+ replicationControl.createTopics(requestContext, request,
Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
expectedResponse.topics().add(new
CreatableTopicResult().setName("foo").
@@ -577,8 +605,9 @@ public class ReplicationControlManagerTest {
.setNumPartitions(-1).setReplicationFactor((short) -1)
.setConfigs(validConfigs));
+ ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result1 =
- replicationControl.createTopics(request1,
Collections.singleton("foo"));
+ replicationControl.createTopics(requestContext, request1,
Collections.singleton("foo"));
assertEquals((short) 0,
result1.response().topics().find("foo").errorCode());
List<ApiMessageAndVersion> records1 = result1.records();
@@ -611,7 +640,7 @@ public class ReplicationControlManagerTest {
.setConfigs(invalidConfigs));
ControllerResult<CreateTopicsResponseData> result2 =
- replicationControl.createTopics(request2,
Collections.singleton("bar"));
+ replicationControl.createTopics(requestContext, request2,
Collections.singleton("bar"));
assertEquals(Errors.INVALID_CONFIG.code(),
result2.response().topics().find("bar").errorCode());
assertEquals(
"Null value not supported for topic configs: foo",
@@ -624,7 +653,7 @@ public class ReplicationControlManagerTest {
.setConfigs(validConfigs));
ControllerResult<CreateTopicsResponseData> result3 =
- replicationControl.createTopics(request3,
Collections.singleton("baz"));
+ replicationControl.createTopics(requestContext, request3,
Collections.singleton("baz"));
assertEquals(INVALID_REPLICATION_FACTOR.code(),
result3.response().topics().find("baz").errorCode());
assertEquals(Collections.emptyList(), result3.records());
@@ -643,7 +672,7 @@ public class ReplicationControlManagerTest {
request4Topics.add(batchedTopic1);
request4Topics.add(batchedTopic2);
ControllerResult<CreateTopicsResponseData> result4 =
- replicationControl.createTopics(request4, request4Topics);
+ replicationControl.createTopics(requestContext, request4,
request4Topics);
assertEquals(Errors.NONE.code(),
result4.response().topics().find(batchedTopic1).errorCode());
assertEquals(INVALID_REPLICATION_FACTOR.code(),
result4.response().topics().find(batchedTopic2).errorCode());
@@ -662,19 +691,27 @@ public class ReplicationControlManagerTest {
assertEquals(batchedTopic1Record.topicId(), ((PartitionRecord)
result4.records().get(2).message()).topicId());
}
- @Test
- public void testCreateTopicsWithValidateOnlyFlag() throws Exception {
+ @ParameterizedTest(name = "testCreateTopicsWithValidateOnlyFlag with
mutationQuotaExceeded: {0}")
+ @ValueSource(booleans = {true, false})
+ public void testCreateTopicsWithValidateOnlyFlag(boolean
mutationQuotaExceeded) throws Exception {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreateTopicsRequestData request = new
CreateTopicsRequestData().setValidateOnly(true);
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(1).setReplicationFactor((short) 3));
+ ControllerRequestContext requestContext = mutationQuotaExceeded ?
+
anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS) :
+ anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- ctx.replicationControl.createTopics(request,
Collections.singleton("foo"));
+ ctx.replicationControl.createTopics(requestContext, request,
Collections.singleton("foo"));
assertEquals(0, result.records().size());
CreatableTopicResult topicResult =
result.response().topics().find("foo");
- assertEquals((short) 0, topicResult.errorCode());
+ if (mutationQuotaExceeded) {
+ assertEquals(THROTTLING_QUOTA_EXCEEDED.code(),
topicResult.errorCode());
+ } else {
+ assertEquals(NONE.code(), topicResult.errorCode());
+ }
}
@Test
@@ -685,8 +722,9 @@ public class ReplicationControlManagerTest {
CreateTopicsRequestData request = new
CreateTopicsRequestData().setValidateOnly(true);
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(1).setReplicationFactor((short) 4));
+ ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- ctx.replicationControl.createTopics(request,
Collections.singleton("foo"));
+ ctx.replicationControl.createTopics(requestContext, request,
Collections.singleton("foo"));
assertEquals(0, result.records().size());
CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
expectedResponse.topics().add(new
CreatableTopicResult().setName("foo").
@@ -728,7 +766,8 @@ public class ReplicationControlManagerTest {
CreatableTopicResult initialTopic = ctx.createTestTopic("foo.bar", 2,
(short) 2, NONE.code());
assertEquals(2,
ctx.replicationControl.getTopic(initialTopic.topicId()).numPartitions(Long.MAX_VALUE));
- ctx.deleteTopic(initialTopic.topicId());
+ ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.DELETE_TOPICS);
+ ctx.deleteTopic(requestContext, initialTopic.topicId());
CreatableTopicResult recreatedTopic = ctx.createTestTopic("foo.bar",
4, (short) 2, NONE.code());
assertNotEquals(initialTopic.topicId(), recreatedTopic.topicId());
@@ -1050,8 +1089,9 @@ public class ReplicationControlManagerTest {
setConfigs(requestConfigs));
ctx.registerBrokers(0, 1);
ctx.unfenceBrokers(0, 1);
+ ControllerRequestContext createTopicsRequestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createResult =
- replicationControl.createTopics(request,
Collections.singleton("foo"));
+ replicationControl.createTopics(createTopicsRequestContext,
request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
Uuid topicId = createResult.response().topics().find("foo").topicId();
expectedResponse.topics().add(new
CreatableTopicResult().setName("foo").
@@ -1082,13 +1122,14 @@ public class ReplicationControlManagerTest {
new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))),
replicationControl.findTopicIds(Long.MAX_VALUE,
Collections.singleton("bar")));
+ ControllerRequestContext deleteTopicsRequestContext =
anonymousContextFor(ApiKeys.DELETE_TOPICS);
ControllerResult<Map<Uuid, ApiError>> invalidDeleteResult =
replicationControl.
- deleteTopics(Collections.singletonList(invalidId));
+ deleteTopics(deleteTopicsRequestContext,
Collections.singletonList(invalidId));
assertEquals(0, invalidDeleteResult.records().size());
assertEquals(singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID,
null)),
invalidDeleteResult.response());
ControllerResult<Map<Uuid, ApiError>> deleteResult =
replicationControl.
- deleteTopics(Collections.singletonList(topicId));
+ deleteTopics(deleteTopicsRequestContext,
Collections.singletonList(topicId));
assertTrue(deleteResult.isAtomic());
assertEquals(singletonMap(topicId, new ApiError(NONE, null)),
deleteResult.response());
@@ -1107,6 +1148,33 @@ public class ReplicationControlManagerTest {
assertEmptyTopicConfigs(ctx, "foo");
}
+ @Test
+ public void testDeleteTopicsWithMutationQuotaExceeded() throws Exception {
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ request.topics().add(new CreatableTopic().setName("foo").
+ setNumPartitions(3).setReplicationFactor((short) 2));
+ ctx.registerBrokers(0, 1);
+ ctx.unfenceBrokers(0, 1);
+ ControllerRequestContext createTopicsRequestContext =
+ anonymousContextFor(ApiKeys.CREATE_TOPICS);
+ ControllerResult<CreateTopicsResponseData> createResult =
+ replicationControl.createTopics(createTopicsRequestContext,
request, Collections.singleton("foo"));
+ CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
+ CreatableTopicResult createdTopic =
createResult.response().topics().find("foo");
+ assertEquals(NONE.code(), createdTopic.errorCode());
+ ctx.replay(createResult.records());
+ ControllerRequestContext deleteTopicsRequestContext =
+
anonymousContextWithMutationQuotaExceededFor(ApiKeys.DELETE_TOPICS);
+ Uuid topicId = createdTopic.topicId();
+ ControllerResult<Map<Uuid, ApiError>> deleteResult =
replicationControl.
+ deleteTopics(deleteTopicsRequestContext,
Collections.singletonList(topicId));
+ assertEquals(singletonMap(topicId, new
ApiError(THROTTLING_QUOTA_EXCEEDED, QUOTA_EXCEEDED_IN_TEST_MSG)),
+ deleteResult.response());
+ assertEquals(0, deleteResult.records().size());
+ }
+
@Test
public void testCreatePartitions() throws Exception {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
@@ -1122,8 +1190,9 @@ public class ReplicationControlManagerTest {
setNumPartitions(2).setReplicationFactor((short) 2));
ctx.registerBrokers(0, 1);
ctx.unfenceBrokers(0, 1);
+ ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createTopicResult =
replicationControl.
- createTopics(request, new HashSet<>(Arrays.asList("foo", "bar",
"quux", "foo2")));
+ createTopics(requestContext, request, new
HashSet<>(Arrays.asList("foo", "bar", "quux", "foo2")));
ctx.replay(createTopicResult.records());
List<CreatePartitionsTopic> topics = new ArrayList<>();
topics.add(new CreatePartitionsTopic().
@@ -1135,7 +1204,7 @@ public class ReplicationControlManagerTest {
topics.add(new CreatePartitionsTopic().
setName("quux").setCount(2).setAssignments(null));
ControllerResult<List<CreatePartitionsTopicResult>>
createPartitionsResult =
- replicationControl.createPartitions(topics);
+ replicationControl.createPartitions(requestContext, topics);
assertEquals(asList(new CreatePartitionsTopicResult().
setName("foo").
setErrorCode(NONE.code()).
@@ -1168,7 +1237,7 @@ public class ReplicationControlManagerTest {
setName("foo2").setCount(3).setAssignments(asList(
new CreatePartitionsAssignment().setBrokerIds(asList(2, 0)))));
ControllerResult<List<CreatePartitionsTopicResult>>
createPartitionsResult2 =
- replicationControl.createPartitions(topics2);
+ replicationControl.createPartitions(requestContext, topics2);
assertEquals(asList(new CreatePartitionsTopicResult().
setName("foo").
setErrorCode(NONE.code()).
@@ -1192,6 +1261,45 @@ public class ReplicationControlManagerTest {
ctx.replay(createPartitionsResult2.records());
}
+ @Test
+ public void testCreatePartitionsWithMutationQuotaExceeded() throws
Exception {
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ request.topics().add(new CreatableTopic().setName("foo").
+ setNumPartitions(3).setReplicationFactor((short) 2));
+ ctx.registerBrokers(0, 1);
+ ctx.unfenceBrokers(0, 1);
+ ControllerRequestContext createTopicsRequestContext =
+ anonymousContextFor(ApiKeys.CREATE_TOPICS);
+ ControllerResult<CreateTopicsResponseData> createResult =
+ replicationControl.createTopics(createTopicsRequestContext,
request, Collections.singleton("foo"));
+ CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
+ CreatableTopicResult createdTopic =
createResult.response().topics().find("foo");
+ assertEquals(NONE.code(), createdTopic.errorCode());
+ ctx.replay(createResult.records());
+ List<CreatePartitionsTopic> topics = new ArrayList<>();
+ topics.add(new CreatePartitionsTopic().
+ setName("foo").setCount(5).setAssignments(null));
+ ControllerRequestContext createPartitionsRequestContext =
+
anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_PARTITIONS);
+ ControllerResult<List<CreatePartitionsTopicResult>>
createPartitionsResult =
+
replicationControl.createPartitions(createPartitionsRequestContext, topics);
+ List<CreatePartitionsTopicResult> expectedThrottled =
singletonList(new CreatePartitionsTopicResult().
+ setName("foo").
+ setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+ setErrorMessage(QUOTA_EXCEEDED_IN_TEST_MSG));
+ assertEquals(expectedThrottled, createPartitionsResult.response());
+ // now test the explicit assignment case
+ List<CreatePartitionsTopic> topics2 = new ArrayList<>();
+ topics2.add(new CreatePartitionsTopic().
+ setName("foo").setCount(4).setAssignments(asList(
+ new CreatePartitionsAssignment().setBrokerIds(asList(1, 0)))));
+ ControllerResult<List<CreatePartitionsTopicResult>>
createPartitionsResult2 =
+
replicationControl.createPartitions(createPartitionsRequestContext, topics2);
+ assertEquals(expectedThrottled, createPartitionsResult2.response());
+ }
+
@Test
public void
testCreatePartitionsFailsWhenAllBrokersAreFencedOrInControlledShutdown() throws
Exception {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
@@ -1203,8 +1311,10 @@ public class ReplicationControlManagerTest {
ctx.registerBrokers(0, 1);
ctx.unfenceBrokers(0, 1);
+ ControllerRequestContext requestContext =
+ anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createTopicResult =
replicationControl.
- createTopics(request, new HashSet<>(Arrays.asList("foo")));
+ createTopics(requestContext, request, new
HashSet<>(Arrays.asList("foo")));
ctx.replay(createTopicResult.records());
ctx.registerBrokers(0, 1);
@@ -1215,7 +1325,7 @@ public class ReplicationControlManagerTest {
topics.add(new CreatePartitionsTopic().
setName("foo").setCount(2).setAssignments(null));
ControllerResult<List<CreatePartitionsTopicResult>>
createPartitionsResult =
- replicationControl.createPartitions(topics);
+ replicationControl.createPartitions(requestContext, topics);
assertEquals(
asList(new CreatePartitionsTopicResult().
@@ -1239,15 +1349,16 @@ public class ReplicationControlManagerTest {
ctx.unfenceBrokers(0, 1);
ctx.inControlledShutdownBrokers(1);
+ ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- replicationControl.createTopics(request,
Collections.singleton("foo"));
+ replicationControl.createTopics(requestContext, request,
Collections.singleton("foo"));
ctx.replay(result.records());
List<CreatePartitionsTopic> topics = asList(new
CreatePartitionsTopic().
setName("foo").setCount(2).setAssignments(null));
ControllerResult<List<CreatePartitionsTopicResult>>
createPartitionsResult =
- replicationControl.createPartitions(topics);
+ replicationControl.createPartitions(requestContext, topics);
ctx.replay(createPartitionsResult.records());
// Broker 2 cannot be in the ISR because it is fenced and broker 1
diff --git a/tests/kafkatest/tests/core/controller_mutation_quota_test.py
b/tests/kafkatest/tests/core/controller_mutation_quota_test.py
new file mode 100644
index 00000000000..b5578ce857e
--- /dev/null
+++ b/tests/kafkatest/tests/core/controller_mutation_quota_test.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark.resource import cluster
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+
+from kafkatest.services.trogdor.produce_bench_workload import
ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
+from kafkatest.services.trogdor.consume_bench_workload import
ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.trogdor.trogdor import TrogdorService
+from kafkatest.services.zookeeper import ZookeeperService
+
+import time
+
+
+class ControllerMutationQuotaTest(Test):
+ """Tests throttled partition changes via the kafka-topics CLI as follows:
+
+ 1. Create a topic with N partitions (creating N partitions)
+ 2. Alter the number of topic partitons to 2N (adding N partitions)
+ 3. Delete topic (removing 2N partitions)
+ 3. Create topic with 1 partition
+
+ The mutation quota should be checked at the beginning of each operation,
+ so the checks are expected to be as follows assuming 1 window for the
entire set of operations:
+
+ Before step 2: alter will be allowed if quota > N
+ Before step 3: delete will be allowed if quota > 2N
+ Before step 4: create will be allowed if quota > 4N
+
+ Therefore, if we want steps 1-3 to succeed and step 4 to fail we need the
2N < quota < 4N
+
+ For example, we can achieve the desired behavior (steps 1-3 succeed, step
4 fails)
+ for N = 10 partitions by setting the following configs:
+
+ controller.quota.window.num=10
+ controller.quota.window.size.seconds=200
+ controller_mutation_rate=X where (20 < 2000X < 40), therefore (0.01 < X <
0.02)
+ """
+ def __init__(self, test_context):
+ super(ControllerMutationQuotaTest,
self).__init__(test_context=test_context)
+ self.test_context = test_context
+ self.zk = ZookeeperService(test_context, num_nodes=1) if
quorum.for_test(test_context) == quorum.zk else None
+ self.window_num = 10
+ self.window_size_seconds = 200 # must be long enough such that all CLI
commands fit into it
+
+ self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+ server_prop_overrides=[
+ ["quota.window.num", "%s" % self.window_num],
+ ["controller.quota.window.size.seconds", "%s" %
self.window_size_seconds]
+ ],
+ controller_num_nodes_override=1)
+
+ def setUp(self):
+ if self.zk:
+ self.zk.start()
+ self.kafka.start()
+
+ def teardown(self):
+ # Need to increase the timeout due to partition count
+ self.kafka.stop()
+ if self.zk:
+ self.zk.stop()
+
+ @cluster(num_nodes=2)
+ @matrix(metadata_quorum=quorum.all)
+ def test_controller_mutation_quota(self, metadata_quorum=quorum.zk):
+ self.partition_count = 10
+ mutation_rate = 3 * self.partition_count / (self.window_num *
self.window_size_seconds)
+
+ # first invoke the steps with no mutation quota to ensure it succeeds
in general
+ self.mutate_partitions("topic_succeed", True)
+
+ # now apply the mutation quota configs
+ node = self.kafka.nodes[0]
+ alter_mutation_quota_cmd = "%s --entity-type users --entity-default
--alter --add-config 'controller_mutation_rate=%f'" % \
+
(self.kafka.kafka_configs_cmd_with_optional_security_settings(node,
force_use_zk_connection=False), mutation_rate)
+ node.account.ssh(alter_mutation_quota_cmd)
+
+ # now invoke the same steps with the mutation quota applied to ensure
it fails
+ self.mutate_partitions("topic_fail", False)
+
+ def mutate_partitions(self, topic_prefix, expected_to_succeed):
+ try:
+ # create the topic
+ topic_name = topic_prefix
+ topic_cfg = {
+ "topic": topic_name,
+ "partitions": self.partition_count,
+ "replication-factor": 1,
+ }
+ self.kafka.create_topic(topic_cfg)
+
+ # alter the partition count
+ node = self.kafka.nodes[0]
+ alter_topic_cmd = "%s --alter --topic %s --partitions %i" % \
+
(self.kafka.kafka_topics_cmd_with_optional_security_settings(node,
force_use_zk_connection=False),
+ topic_name, 2 * self.partition_count)
+ node.account.ssh(alter_topic_cmd)
+
+ # delete the topic
+ self.kafka.delete_topic(topic_name)
+ except RuntimeError as e:
+ fail_msg = "Failure: Unexpected Exception - %s" % str(e)
+ self.logger.error(fail_msg)
+ raise RuntimeError(fail_msg)
+
+ # create a second topic with a single partition
+ topic_cfg = {
+ "topic": "%s_b" % topic_prefix,
+ "partitions": 1,
+ "replication-factor": 1,
+ }
+ fail_msg = ""
+ try:
+ self.kafka.create_topic(topic_cfg)
+ if not expected_to_succeed:
+ fail_msg = "Failure: Did not encounter controller mutation
quota violation when one was expected"
+ else:
+ self.logger.info("Success: Encountered no quota violation as
expected")
+ except RuntimeError as e:
+ if expected_to_succeed:
+ fail_msg = "Failure: Unexpected Exception - %s" % str(e)
+ else:
+ self.logger.info("Success: Encountered quota violation
exception as expected")
+ if fail_msg:
+ self.logger.error(fail_msg)
+ raise RuntimeError(fail_msg)
\ No newline at end of file