This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3817cac894 KAFKA-14351: Controller Mutation Quota for KRaft (#13116)
e3817cac894 is described below

commit e3817cac894140e0b26c09050b10ce653f7c3d97
Author: Ron Dagostino <[email protected]>
AuthorDate: Tue Mar 7 14:25:34 2023 -0500

    KAFKA-14351: Controller Mutation Quota for KRaft (#13116)
    
    Implement KIP-599 controller mutation quotas for the KRaft controller. 
These quotas apply to create
    topics, create partitions, and delete topic operations. They are specified 
in terms of number of
    partitions.
    
    The approach taken here is to reuse the ControllerMutationQuotaManager that 
is also used in ZK
    mode. The quotas are implemented as Sensor objects and Sensor.checkQuotas 
enforces the quota,
    whereas Sensor.record notes that new partitions have been modified. While 
ControllerApis handles
    fetching the Sensor objects, we must make the final callback to check the 
quotas from within
    QuorumController. The reason is because only QuorumController knows the 
final number of partitions
    that must be modified. (As one example, up-to-date information about the 
number of partitions that
    will be deleted when a topic is deleted is really only available in 
QuorumController.)
    
    For quota enforcement, the logic is already in place. The KRaft controller 
is expected to set the
    throttle time in the response that is embedded in EnvelopeResponse, but it 
does not actually apply
    the throttle because there is no client connection to throttle. Instead, 
the broker that forwarded
    the request is expected to return the throttle value from the controller 
and to throttle the client
    connection. It also applies its own request quota, so the enforced/returned 
quota is the maximum of
    the two.
    
    This PR also installs a DynamicConfigPublisher in ControllerServer. This 
allows dynamic
    configurations to be published on the controller. Previously, they could be 
set, but they were not
    applied. Note that we still don't have a good way to set node-level 
configurations for isolatied
    controllers. However, this will allow us to set cluster configs (aka 
default node configs) and have
    them take effect on the controllers.
    
    In a similar vein, this PR separates out the dynamic client quota publisher 
logic used on the
    broker into DynamicClientQuotaPublisher. We can now install this on both 
BrokerServer and
    ControllerServer. This makes dynamically configuring quotas (such as 
controller mutation quotas)
    possible.
    
    Also add a ducktape test, controller_mutation_quota_test.py.
    
    Reviewers: David Jacot <[email protected]>, Ismael Juma 
<[email protected]>, Colin P. McCabe <[email protected]>
---
 .../main/scala/kafka/server/ControllerApis.scala   |  27 +++-
 .../main/scala/kafka/server/ControllerServer.scala |  39 +++--
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  99 ++++++++----
 .../metadata/DynamicClientQuotaPublisher.scala     |  82 ++++++++++
 .../server/metadata/DynamicConfigPublisher.scala   |  43 +++++-
 core/src/test/java/kafka/test/MockController.java  |  84 +++++++---
 .../kafka/server/KRaftClusterTest.scala            |  89 ++++++++++-
 .../unit/kafka/server/ControllerApisTest.scala     | 141 ++++++++++++++++-
 .../kafka/server/DynamicBrokerConfigTest.scala     | 155 ++++++++++++++++---
 .../kafka/controller/ControllerRequestContext.java |  37 ++++-
 .../apache/kafka/controller/QuorumController.java  |   6 +-
 .../controller/ReplicationControlManager.java      |  57 +++++--
 .../controller/ControllerRequestContextUtil.java   |  23 ++-
 .../controller/ReplicationControlManagerTest.java  | 171 +++++++++++++++++----
 .../tests/core/controller_mutation_quota_test.py   | 142 +++++++++++++++++
 15 files changed, 1040 insertions(+), 155 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 0964c07215c..b8f233fa94f 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -17,10 +17,11 @@
 
 package kafka.server
 
-import java.util
+import java.{lang, util}
 import java.util.{Collections, OptionalLong}
 import java.util.Map.Entry
 import java.util.concurrent.CompletableFuture
+import java.util.function.Consumer
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
@@ -175,8 +176,10 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   def handleDeleteTopics(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+    val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 5)
     val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal,
-      requestTimeoutMsToDeadlineNs(time, deleteTopicsRequest.data.timeoutMs))
+      requestTimeoutMsToDeadlineNs(time, deleteTopicsRequest.data.timeoutMs),
+      controllerMutationQuotaRecorderFor(controllerMutationQuota))
     val future = deleteTopics(context,
       deleteTopicsRequest.data,
       request.context.apiVersion,
@@ -184,7 +187,7 @@ class ControllerApis(val requestChannel: RequestChannel,
       names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
       names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
     future.handle[Unit] { (results, exception) =>
-      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota,
 request, throttleTimeMs => {
         if (exception != null) {
           deleteTopicsRequest.getErrorResponse(throttleTimeMs, exception)
         } else {
@@ -339,8 +342,10 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   def handleCreateTopics(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val createTopicsRequest = request.body[CreateTopicsRequest]
+    val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
     val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal,
-      requestTimeoutMsToDeadlineNs(time, createTopicsRequest.data.timeoutMs))
+      requestTimeoutMsToDeadlineNs(time, createTopicsRequest.data.timeoutMs),
+      controllerMutationQuotaRecorderFor(controllerMutationQuota))
     val future = createTopics(context,
         createTopicsRequest.data,
         authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, 
logIfDenied = false),
@@ -348,7 +353,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         names => authHelper.filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
             names, logIfDenied = false)(identity))
     future.handle[Unit] { (result, exception) =>
-      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota,
 request, throttleTimeMs => {
         if (exception != null) {
           createTopicsRequest.getErrorResponse(throttleTimeMs, exception)
         } else {
@@ -359,6 +364,12 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def controllerMutationQuotaRecorderFor(controllerMutationQuota: 
ControllerMutationQuota) = {
+    new Consumer[lang.Integer]() {
+      override def accept(permits: lang.Integer): Unit = 
controllerMutationQuota.record(permits.doubleValue())
+    }
+  }
+
   def createTopics(
     context: ControllerRequestContext,
     request: CreateTopicsRequestData,
@@ -748,8 +759,10 @@ class ControllerApis(val requestChannel: RequestChannel,
       authHelper.filterByAuthorized(request.context, ALTER, TOPIC, topics)(n 
=> n)
     }
     val createPartitionsRequest = request.body[CreatePartitionsRequest]
+    val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 3)
     val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal,
-      requestTimeoutMsToDeadlineNs(time, 
createPartitionsRequest.data.timeoutMs))
+      requestTimeoutMsToDeadlineNs(time, 
createPartitionsRequest.data.timeoutMs),
+      controllerMutationQuotaRecorderFor(controllerMutationQuota))
     val future = createPartitions(context,
       createPartitionsRequest.data(),
       filterAlterAuthorizedTopics)
@@ -757,7 +770,7 @@ class ControllerApis(val requestChannel: RequestChannel,
       if (exception != null) {
         requestHelper.handleError(request, exception)
       } else {
-        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota,
 request, requestThrottleMs => {
           val responseData = new CreatePartitionsResponseData().
             setResults(responses).
             setThrottleTimeMs(requestThrottleMs)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 908e26a3020..b1a15d97741 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -24,8 +24,10 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.KafkaRaftManager
 import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, 
CreateTopicPolicyClassNameProp}
-import kafka.server.KafkaRaftServer.BrokerRole
 import kafka.server.QuotaFactory.QuotaManagers
+
+import scala.collection.immutable
+import kafka.server.metadata.{ClientQuotaMetadataManager, 
DynamicClientQuotaPublisher, DynamicConfigPublisher}
 import kafka.utils.{CoreUtils, Logging}
 import kafka.zk.{KafkaZkClient, ZkMigrationClient}
 import org.apache.kafka.common.config.ConfigException
@@ -102,9 +104,11 @@ class ControllerServer(
   var alterConfigPolicy: Option[AlterConfigPolicy] = None
   var controller: Controller = _
   var quotaManagers: QuotaManagers = _
+  var clientQuotaMetadataManager: ClientQuotaMetadataManager = _
   var controllerApis: ControllerApis = _
   var controllerApisHandlerPool: KafkaRequestHandlerPool = _
   var migrationSupport: Option[ControllerMigrationSupport] = None
+  def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
 
   private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): 
Boolean = {
     lock.lock()
@@ -118,13 +122,6 @@ class ControllerServer(
     true
   }
 
-  private def doRemoteKraftSetup(): Unit = {
-    // Explicitly configure metric reporters on this remote controller.
-    // We do not yet support dynamic reconfiguration on remote controllers in 
general;
-    // remove this once that is implemented.
-    new DynamicMetricReporterState(config.nodeId, config, metrics, clusterId)
-  }
-
   def clusterId: String = sharedServer.metaProps.clusterId
 
   def startup(): Unit = {
@@ -243,11 +240,6 @@ class ControllerServer(
       }
       controller = controllerBuilder.build()
 
-      // Perform any setup that is done only when this node is a 
controller-only node.
-      if (!config.processRoles.contains(BrokerRole)) {
-        doRemoteKraftSetup()
-      }
-
       if (config.migrationEnabled) {
         val zkClient = KafkaZkClient.createZkClient("KRaft Migration", time, 
config, KafkaServer.zkClientConfigFromKafkaConfig(config))
         val migrationClient = new ZkMigrationClient(zkClient)
@@ -272,6 +264,7 @@ class ControllerServer(
         metrics,
         time,
         threadNamePrefix)
+      clientQuotaMetadataManager = new 
ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
       controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
         authorizer,
         quotaManagers,
@@ -309,6 +302,26 @@ class ControllerServer(
       FutureUtils.waitWithLogging(logger.underlying, "all of the SocketServer 
Acceptors to be started",
         socketServerFuture, startupDeadline, time)
 
+      // register this instance for dynamic config changes to the KafkaConfig
+      config.dynamicConfig.addReconfigurables(this)
+      // We must install the below publisher and receive the changes when we 
are also running the broker role
+      // because we don't share a single KafkaConfig instance with the broker, 
and therefore
+      // the broker's DynamicConfigPublisher won't take care of the changes 
for us.
+      val dynamicConfigHandlers = immutable.Map[String, ConfigHandler](
+        // controllers don't host topics, so no need to do anything with 
dynamic topic config changes here
+        ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
+      val dynamicConfigPublisher = new DynamicConfigPublisher(
+        config,
+        sharedServer.metadataPublishingFaultHandler,
+        dynamicConfigHandlers,
+        "controller")
+      val dynamicClientQuotaPublisher = new DynamicClientQuotaPublisher(
+        config,
+        sharedServer.metadataPublishingFaultHandler,
+        "controller",
+        clientQuotaMetadataManager)
+      FutureUtils.waitWithLogging(logger.underlying, "all of the dynamic 
config and client quota publishers to be installed",
+        sharedServer.loader.installPublishers(List(dynamicConfigPublisher, 
dynamicClientQuotaPublisher).asJava), startupDeadline, time)
     } catch {
       case e: Throwable =>
         maybeChangeStatus(STARTING, STARTED)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index b924648c691..940580d155b 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -25,6 +25,7 @@ import kafka.cluster.EndPoint
 import kafka.log.{LogCleaner, LogManager}
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.server.DynamicBrokerConfig._
+import kafka.server.KafkaRaftServer.BrokerRole
 import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -262,15 +263,35 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     }
     addReconfigurable(kafkaServer.kafkaYammerMetrics)
     addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, 
kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId))
-    addReconfigurable(new DynamicClientQuotaCallback(kafkaServer))
+    addReconfigurable(new 
DynamicClientQuotaCallback(kafkaServer.quotaManagers, kafkaServer.config))
 
-    addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
+    addBrokerReconfigurable(new BrokerDynamicThreadPool(kafkaServer))
     addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, 
kafkaServer))
     addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
     addBrokerReconfigurable(kafkaServer.socketServer)
     addBrokerReconfigurable(new 
DynamicProducerStateManagerConfig(kafkaServer.logManager.producerStateManagerConfig))
   }
 
+  /**
+   * Add reconfigurables to be notified when a dynamic controller config is 
updated.
+   */
+  def addReconfigurables(controller: ControllerServer): Unit = {
+    controller.authorizer match {
+      case Some(authz: Reconfigurable) => addReconfigurable(authz)
+      case _ =>
+    }
+    if (!kafkaConfig.processRoles.contains(BrokerRole)) {
+      // only add these if the controller isn't also running the broker role
+      // because these would already be added via the broker in that case
+      addReconfigurable(controller.kafkaYammerMetrics)
+      addReconfigurable(new DynamicMetricsReporters(kafkaConfig.nodeId, 
controller.config, controller.metrics, controller.clusterId))
+    }
+    addReconfigurable(new DynamicClientQuotaCallback(controller.quotaManagers, 
controller.config))
+    addBrokerReconfigurable(new ControllerDynamicThreadPool(controller))
+    // TODO: addBrokerReconfigurable(new DynamicListenerConfig(controller))
+    addBrokerReconfigurable(controller.socketServer)
+  }
+
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
     verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
     reconfigurables.add(reconfigurable)
@@ -704,19 +725,12 @@ object DynamicThreadPool {
     KafkaConfig.NumReplicaFetchersProp,
     KafkaConfig.NumRecoveryThreadsPerDataDirProp,
     KafkaConfig.BackgroundThreadsProp)
-}
 
-class DynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {
-
-  override def reconfigurableConfigs: Set[String] = {
-    DynamicThreadPool.ReconfigurableConfigs
-  }
-
-  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+  def validateReconfiguration(currentConfig: KafkaConfig, newConfig: 
KafkaConfig): Unit = {
     newConfig.values.forEach { (k, v) =>
-      if (DynamicThreadPool.ReconfigurableConfigs.contains(k)) {
+      if (ReconfigurableConfigs.contains(k)) {
         val newValue = v.asInstanceOf[Int]
-        val oldValue = currentValue(k)
+        val oldValue = getValue(currentConfig, k)
         if (newValue != oldValue) {
           val errorMsg = s"Dynamic thread count update validation failed for 
$k=$v"
           if (newValue <= 0)
@@ -730,6 +744,43 @@ class DynamicThreadPool(server: KafkaBroker) extends 
BrokerReconfigurable {
     }
   }
 
+  def getValue(config: KafkaConfig, name: String): Int = {
+    name match {
+      case KafkaConfig.NumIoThreadsProp => config.numIoThreads
+      case KafkaConfig.NumReplicaFetchersProp => config.numReplicaFetchers
+      case KafkaConfig.NumRecoveryThreadsPerDataDirProp => 
config.numRecoveryThreadsPerDataDir
+      case KafkaConfig.BackgroundThreadsProp => config.backgroundThreads
+      case n => throw new IllegalStateException(s"Unexpected config $n")
+    }
+  }
+}
+
+class ControllerDynamicThreadPool(controller: ControllerServer) extends 
BrokerReconfigurable {
+
+  override def reconfigurableConfigs: Set[String] = {
+    Set(KafkaConfig.NumIoThreadsProp)
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+    DynamicThreadPool.validateReconfiguration(controller.config, newConfig) // 
common validation
+  }
+
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
+    if (newConfig.numIoThreads != oldConfig.numIoThreads)
+      
controller.controllerApisHandlerPool.resizeThreadPool(newConfig.numIoThreads)
+  }
+}
+
+class BrokerDynamicThreadPool(server: KafkaBroker) extends 
BrokerReconfigurable {
+
+  override def reconfigurableConfigs: Set[String] = {
+    DynamicThreadPool.ReconfigurableConfigs
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+    DynamicThreadPool.validateReconfiguration(server.config, newConfig)
+  }
+
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
     if (newConfig.numIoThreads != oldConfig.numIoThreads)
       
server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
@@ -740,16 +791,6 @@ class DynamicThreadPool(server: KafkaBroker) extends 
BrokerReconfigurable {
     if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
       server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
   }
-
-  private def currentValue(name: String): Int = {
-    name match {
-      case KafkaConfig.NumIoThreadsProp => server.config.numIoThreads
-      case KafkaConfig.NumReplicaFetchersProp => 
server.config.numReplicaFetchers
-      case KafkaConfig.NumRecoveryThreadsPerDataDirProp => 
server.config.numRecoveryThreadsPerDataDir
-      case KafkaConfig.BackgroundThreadsProp => server.config.backgroundThreads
-      case n => throw new IllegalStateException(s"Unexpected config $n")
-    }
-  }
 }
 
 class DynamicMetricsReporters(brokerId: Int, config: KafkaConfig, metrics: 
Metrics, clusterId: String) extends Reconfigurable {
@@ -911,13 +952,16 @@ object DynamicListenerConfig {
   )
 }
 
-class DynamicClientQuotaCallback(server: KafkaBroker) extends Reconfigurable {
+class DynamicClientQuotaCallback(
+  quotaManagers: QuotaFactory.QuotaManagers,
+  serverConfig: KafkaConfig
+) extends Reconfigurable {
 
   override def configure(configs: util.Map[String, _]): Unit = {}
 
   override def reconfigurableConfigs(): util.Set[String] = {
     val configs = new util.HashSet[String]()
-    server.quotaManagers.clientQuotaCallback.foreach {
+    quotaManagers.clientQuotaCallback.foreach {
       case callback: Reconfigurable => 
configs.addAll(callback.reconfigurableConfigs)
       case _ =>
     }
@@ -925,17 +969,16 @@ class DynamicClientQuotaCallback(server: KafkaBroker) 
extends Reconfigurable {
   }
 
   override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
-    server.quotaManagers.clientQuotaCallback.foreach {
+    quotaManagers.clientQuotaCallback.foreach {
       case callback: Reconfigurable => 
callback.validateReconfiguration(configs)
       case _ =>
     }
   }
 
   override def reconfigure(configs: util.Map[String, _]): Unit = {
-    val config = server.config
-    server.quotaManagers.clientQuotaCallback.foreach {
+    quotaManagers.clientQuotaCallback.foreach {
       case callback: Reconfigurable =>
-        config.dynamicConfig.maybeReconfigure(callback, 
config.dynamicConfig.currentKafkaConfig, configs)
+        serverConfig.dynamicConfig.maybeReconfigure(callback, 
serverConfig.dynamicConfig.currentKafkaConfig, configs)
         true
       case _ => false
     }
diff --git 
a/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
new file mode 100644
index 00000000000..0533161b6d1
--- /dev/null
+++ 
b/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.server.KafkaConfig
+import kafka.utils.Logging
+import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.server.fault.FaultHandler
+
+
+class DynamicClientQuotaPublisher(
+  conf: KafkaConfig,
+  faultHandler: FaultHandler,
+  nodeType: String,
+  clientQuotaMetadataManager: ClientQuotaMetadataManager,
+) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
+  logIdent = s"[${name()}] "
+
+  def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
+    val deltaName = s"MetadataDelta up to 
${newImage.highestOffsetAndEpoch().offset}"
+    try {
+        Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
+          clientQuotaMetadataManager.update(clientQuotasDelta)
+        }
+    } catch {
+      case t: Throwable => faultHandler.handleFault("Uncaught exception while 
" +
+        s"publishing dynamic client quota changes from ${deltaName}", t)
+    }
+  }
+
+  /**
+   * Returns the name of this publisher.
+   *
+   * @return The publisher name.
+   */
+  override def name(): String = s"DynamicClientQuotaPublisher ${nodeType} 
id=${conf.nodeId}"
+
+  /**
+   * Publish a new cluster metadata snapshot that we loaded.
+   *
+   * @param delta    The delta between the previous state and the new one.
+   * @param newImage The complete new state.
+   * @param manifest The contents of what was published.
+   */
+  override def publishSnapshot(delta: MetadataDelta, newImage: MetadataImage, 
manifest: SnapshotManifest): Unit = {
+    publish(delta, newImage)
+  }
+
+  /**
+   * Publish a change to the cluster metadata.
+   *
+   * @param delta    The delta between the previous state and the new one.
+   * @param newImage The complete new state.
+   * @param manifest The contents of what was published.
+   */
+  override def publishLogDelta(delta: MetadataDelta, newImage: MetadataImage, 
manifest: LogDeltaManifest): Unit = {
+    publish(delta, newImage)
+  }
+
+  /**
+   * Close this metadata publisher.
+   */
+  override def close(): Unit = {
+    // nothing to close
+  }
+}
diff --git 
a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
index 12ff51d4039..a3581350137 100644
--- a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
@@ -22,6 +22,7 @@ import kafka.server.ConfigAdminManager.toLoggableProps
 import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig}
 import kafka.utils.Logging
 import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC}
+import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.server.fault.FaultHandler
 
@@ -30,9 +31,9 @@ class DynamicConfigPublisher(
   conf: KafkaConfig,
   faultHandler: FaultHandler,
   dynamicConfigHandlers: Map[String, ConfigHandler],
-  nodeType: String
-) extends Logging {
-  logIdent = s"[DynamicConfigPublisher nodeType=${nodeType} id=${conf.nodeId}] 
"
+  nodeType: String,
+) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
+  logIdent = s"[${name()}] "
 
   def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
     val deltaName = s"MetadataDelta up to 
${newImage.highestOffsetAndEpoch().offset}"
@@ -100,4 +101,40 @@ class DynamicConfigPublisher(
   def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
     conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
   }
+
+  /**
+   * Returns the name of this publisher.
+   *
+   * @return The publisher name.
+   */
+  override def name(): String = s"DynamicConfigPublisher ${nodeType} 
id=${conf.nodeId}"
+
+  /**
+   * Publish a new cluster metadata snapshot that we loaded.
+   *
+   * @param delta    The delta between the previous state and the new one.
+   * @param newImage The complete new state.
+   * @param manifest The contents of what was published.
+   */
+  override def publishSnapshot(delta: MetadataDelta, newImage: MetadataImage, 
manifest: SnapshotManifest): Unit = {
+    publish(delta, newImage)
+  }
+
+  /**
+   * Publish a change to the cluster metadata.
+   *
+   * @param delta    The delta between the previous state and the new one.
+   * @param newImage The complete new state.
+   * @param manifest The contents of what was published.
+   */
+  override def publishLogDelta(delta: MetadataDelta, newImage: MetadataImage, 
manifest: LogDeltaManifest): Unit = {
+    publish(delta, newImage)
+  }
+
+  /**
+   * Close this metadata publisher.
+   */
+  override def close(): Unit = {
+    // nothing to close
+  }
 }
diff --git a/core/src/test/java/kafka/test/MockController.java 
b/core/src/test/java/kafka/test/MockController.java
index bd9dd5a649a..289ef75a560 100644
--- a/core/src/test/java/kafka/test/MockController.java
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
 import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
 import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
 import org.apache.kafka.common.message.AlterPartitionRequestData;
@@ -104,6 +105,11 @@ public class MockController implements Controller {
             return this;
         }
 
+        public Builder newInitialTopic(String name, Uuid id, int 
numPartitions) {
+            initialTopics.put(name, new MockTopic(name, id, numPartitions));
+            return this;
+        }
+
         public MockController build() {
             return new MockController(initialTopics.values());
         }
@@ -149,30 +155,37 @@ public class MockController implements Controller {
             } else {
                 long topicId = nextTopicId.getAndIncrement();
                 Uuid topicUuid = new Uuid(0, topicId);
-                topicNameToId.put(topic.name(), topicUuid);
-                topics.put(topicUuid, new MockTopic(topic.name(), topicUuid));
+                MockTopic mockTopic = new MockTopic(topic.name(), topicUuid);
                 CreatableTopicResult creatableTopicResult = new 
CreatableTopicResult().
                     setName(topic.name()).
-                    setErrorCode(Errors.NONE.code()).
-                    setTopicId(topicUuid);
-                if (describable.contains(topic.name())) {
-                    // Note: we don't simulate topic configs here yet.
-                    // Just returning replication factor and numPartitions.
-                    if (topic.assignments() != null && 
!topic.assignments().isEmpty()) {
-                        creatableTopicResult.
-                            setTopicConfigErrorCode(Errors.NONE.code()).
-                            setReplicationFactor((short)
-                                
topic.assignments().iterator().next().brokerIds().size()).
-                            setNumPartitions(topic.assignments().size());
+                    setErrorCode(Errors.NONE.code());
+                try {
+                    context.applyPartitionChangeQuota(mockTopic.numPartitions);
+                    creatableTopicResult.setTopicId(topicUuid);
+                    topicNameToId.put(topic.name(), topicUuid);
+                    topics.put(topicUuid, mockTopic);
+                    if (describable.contains(topic.name())) {
+                        // Note: we don't simulate topic configs here yet.
+                        // Just returning replication factor and numPartitions.
+                        if (topic.assignments() != null && 
!topic.assignments().isEmpty()) {
+                            creatableTopicResult.
+                                setTopicConfigErrorCode(Errors.NONE.code()).
+                                setReplicationFactor((short)
+                                    
topic.assignments().iterator().next().brokerIds().size()).
+                                setNumPartitions(topic.assignments().size());
+                        } else {
+                            creatableTopicResult.
+                                setTopicConfigErrorCode(Errors.NONE.code()).
+                                
setReplicationFactor(topic.replicationFactor()).
+                                setNumPartitions(topic.numPartitions());
+                        }
                     } else {
                         creatableTopicResult.
-                            setTopicConfigErrorCode(Errors.NONE.code()).
-                            setReplicationFactor(topic.replicationFactor()).
-                            setNumPartitions(topic.numPartitions());
+                            
setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code());
                     }
-                } else {
-                    creatableTopicResult.
-                        
setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code());
+                } catch (ThrottlingQuotaExceededException e) {
+                    ApiError apiError = new 
ApiError(Errors.THROTTLING_QUOTA_EXCEEDED);
+                    
creatableTopicResult.setErrorCode(apiError.error().code()).setErrorMessage(apiError.message());
                 }
                 response.topics().add(creatableTopicResult);
             }
@@ -191,10 +204,16 @@ public class MockController implements Controller {
     static class MockTopic {
         private final String name;
         private final Uuid id;
+        private final int numPartitions;
 
         MockTopic(String name, Uuid id) {
+            this(name, id, 1);
+        }
+
+        MockTopic(String name, Uuid id, int numPartitions) {
             this.name = name;
             this.id = id;
+            this.numPartitions = numPartitions;
         }
     }
 
@@ -260,12 +279,18 @@ public class MockController implements Controller {
         }
         Map<Uuid, ApiError> results = new HashMap<>();
         for (Uuid topicId : topicIds) {
-            MockTopic topic = topics.remove(topicId);
+            MockTopic topic = topics.get(topicId);
             if (topic == null) {
                 results.put(topicId, new ApiError(Errors.UNKNOWN_TOPIC_ID));
             } else {
-                topicNameToId.remove(topic.name);
-                results.put(topicId, ApiError.NONE);
+                try {
+                    context.applyPartitionChangeQuota(topic.numPartitions);
+                    topics.remove(topicId);
+                    topicNameToId.remove(topic.name);
+                    results.put(topicId, ApiError.NONE);
+                } catch (ThrottlingQuotaExceededException e) {
+                    results.put(topicId, new 
ApiError(Errors.THROTTLING_QUOTA_EXCEEDED));
+                }
             }
         }
         return CompletableFuture.completedFuture(results);
@@ -434,9 +459,18 @@ public class MockController implements Controller {
         List<CreatePartitionsTopicResult> results = new ArrayList<>();
         for (CreatePartitionsTopic topic : topicList) {
             if (topicNameToId.containsKey(topic.name())) {
-                results.add(new 
CreatePartitionsTopicResult().setName(topic.name()).
-                    setErrorCode(Errors.NONE.code()).
-                    setErrorMessage(null));
+                try {
+                    context.applyPartitionChangeQuota(topic.count());
+                    results.add(new 
CreatePartitionsTopicResult().setName(topic.name()).
+                        setErrorCode(Errors.NONE.code()).
+                        setErrorMessage(null));
+                } catch (ThrottlingQuotaExceededException e) {
+                    ApiError apiError = new 
ApiError(Errors.THROTTLING_QUOTA_EXCEEDED);
+                    results.add(new CreatePartitionsTopicResult().
+                        setName(topic.name()).
+                        setErrorCode(apiError.error().code()).
+                        setErrorMessage(apiError.message()));
+                }
             } else {
                 results.add(new 
CreatePartitionsTopicResult().setName(topic.name()).
                     setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 768b61e70d4..aee0c182ec2 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -31,14 +31,19 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
 import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, 
DescribeClusterResponse}
-import org.apache.kafka.common.{Endpoint, TopicPartition, TopicPartitionInfo}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.{Cluster, Endpoint, Reconfigurable, 
TopicPartition, TopicPartitionInfo}
 import org.apache.kafka.image.ClusterImage
 import org.apache.kafka.metadata.BrokerState
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
+import org.apache.kafka.server.quota
+import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{Tag, Test, Timeout}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import org.slf4j.LoggerFactory
 
 import java.io.File
@@ -581,10 +586,10 @@ class KRaftClusterTest {
         assertEquals(Seq(ApiError.NONE), incrementalAlter(admin, Seq(
           (new ConfigResource(Type.BROKER, ""), Seq(
             new AlterConfigOp(new ConfigEntry("log.roll.ms", "1234567"), 
OpType.SET),
-            new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "6"), 
OpType.SET))))))
+            new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "60"), 
OpType.SET))))))
         validateConfigs(admin, Map(new ConfigResource(Type.BROKER, "") -> Seq(
           ("log.roll.ms", "1234567"),
-          ("max.connections.per.ip", "6"))), true)
+          ("max.connections.per.ip", "60"))), true)
 
         admin.createTopics(Arrays.asList(
           new NewTopic("foo", 2, 3.toShort),
@@ -967,6 +972,48 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true))
+  def testReconfigureControllerClientQuotas(combinedController: Boolean): Unit 
= {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setCoResident(combinedController).
+        setNumControllerNodes(1).build()).
+      setConfigProp("client.quota.callback.class", 
classOf[DummyClientQuotaCallback].getName).
+      
setConfigProp(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey, 
"0").
+      build()
+
+    def assertConfigValue(expected: Int): Unit = {
+      TestUtils.retry(60000) {
+        assertEquals(expected, 
cluster.controllers().values().iterator().next().
+          
quotaManagers.clientQuotaCallback.get.asInstanceOf[DummyClientQuotaCallback].value)
+        assertEquals(expected, cluster.brokers().values().iterator().next().
+          
quotaManagers.clientQuotaCallback.get.asInstanceOf[DummyClientQuotaCallback].value)
+      }
+    }
+
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      assertConfigValue(0)
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        admin.incrementalAlterConfigs(
+          Collections.singletonMap(new ConfigResource(Type.BROKER, ""),
+            Collections.singletonList(new AlterConfigOp(
+              new 
ConfigEntry(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey, 
"1"), OpType.SET)))).
+          all().get()
+      } finally {
+        admin.close()
+      }
+      assertConfigValue(1)
+    } finally {
+      cluster.close()
+    }
+  }
 }
 
 class BadAuthorizer() extends Authorizer {
@@ -987,3 +1034,39 @@ class BadAuthorizer() extends Authorizer {
 
   override def deleteAcls(requestContext: AuthorizableRequestContext, 
aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: 
CompletionStage[AclDeleteResult]] = ???
 }
+
+object DummyClientQuotaCallback {
+  val dummyClientQuotaCallbackValueConfigKey = 
"dummy.client.quota.callback.value"
+}
+
+class DummyClientQuotaCallback() extends ClientQuotaCallback with 
Reconfigurable {
+  var value = 0
+  override def quotaMetricTags(quotaType: ClientQuotaType, principal: 
KafkaPrincipal, clientId: String): util.Map[String, String] = 
Collections.emptyMap()
+
+  override def quotaLimit(quotaType: ClientQuotaType, metricTags: 
util.Map[String, String]): lang.Double = 1.0
+
+  override def updateQuota(quotaType: ClientQuotaType, quotaEntity: 
quota.ClientQuotaEntity, newValue: Double): Unit = {}
+
+  override def removeQuota(quotaType: ClientQuotaType, quotaEntity: 
quota.ClientQuotaEntity): Unit = {}
+
+  override def quotaResetRequired(quotaType: ClientQuotaType): Boolean = true
+
+  override def updateClusterMetadata(cluster: Cluster): Boolean = false
+
+  override def close(): Unit = {}
+
+  override def configure(configs: util.Map[String, _]): Unit = {
+    val newValue = 
configs.get(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey)
+    if (newValue != null) {
+      value = Integer.parseInt(newValue.toString)
+    }
+  }
+
+  override def reconfigurableConfigs(): util.Set[String] = 
Set(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey).asJava
+
+  override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+  }
+
+  override def reconfigure(configs: util.Map[String, _]): Unit = 
configure(configs)
+}
+
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index c5919282802..ffa56de63db 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -70,6 +70,27 @@ import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
 class ControllerApisTest {
+  object MockControllerMutationQuota {
+    val errorMessage = "quota exceeded in test"
+    var throttleTimeMs = 1000
+  }
+
+  case class MockControllerMutationQuota(quota: Int) extends 
ControllerMutationQuota {
+    var permitsRecorded = 0.0
+
+    override def isExceeded: Boolean = permitsRecorded > quota
+
+    override def record(permits: Double): Unit = {
+      if (permits >= 0) {
+        permitsRecorded += permits
+        if (isExceeded)
+          throw new ThrottlingQuotaExceededException(throttleTime, 
MockControllerMutationQuota.errorMessage)
+      }
+    }
+
+    override def throttleTime: Int = if (isExceeded) 
MockControllerMutationQuota.throttleTimeMs else 0
+  }
+
   private val nodeId = 1
   private val brokerRack = "Rack1"
   private val clientID = "Client1"
@@ -78,15 +99,38 @@ class ControllerApisTest {
   private val time = new MockTime
   private val clientQuotaManager: ClientQuotaManager = 
mock(classOf[ClientQuotaManager])
   private val clientRequestQuotaManager: ClientRequestQuotaManager = 
mock(classOf[ClientRequestQuotaManager])
-  private val clientControllerQuotaManager: ControllerMutationQuotaManager = 
mock(classOf[ControllerMutationQuotaManager])
+  private val neverThrottlingClientControllerQuotaManager: 
ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager])
+  when(neverThrottlingClientControllerQuotaManager.newQuotaFor(
+    any(classOf[RequestChannel.Request]),
+    any(classOf[Short])
+  )).thenReturn(
+    MockControllerMutationQuota(Integer.MAX_VALUE) // never throttles
+  )
+  private val alwaysThrottlingClientControllerQuotaManager: 
ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager])
+  when(alwaysThrottlingClientControllerQuotaManager.newQuotaFor(
+    any(classOf[RequestChannel.Request]),
+    any(classOf[Short])
+  )).thenReturn(
+    MockControllerMutationQuota(0) // always throttles
+  )
   private val replicaQuotaManager: ReplicationQuotaManager = 
mock(classOf[ReplicationQuotaManager])
   private val raftManager: RaftManager[ApiMessageAndVersion] = 
mock(classOf[RaftManager[ApiMessageAndVersion]])
 
-  private val quotas = QuotaManagers(
+  private val quotasNeverThrottleControllerMutations = QuotaManagers(
+    clientQuotaManager,
+    clientQuotaManager,
+    clientRequestQuotaManager,
+    neverThrottlingClientControllerQuotaManager,
+    replicaQuotaManager,
+    replicaQuotaManager,
+    replicaQuotaManager,
+    None)
+
+  private val quotasAlwaysThrottleControllerMutations = QuotaManagers(
     clientQuotaManager,
     clientQuotaManager,
     clientRequestQuotaManager,
-    clientControllerQuotaManager,
+    alwaysThrottlingClientControllerQuotaManager,
     replicaQuotaManager,
     replicaQuotaManager,
     replicaQuotaManager,
@@ -94,7 +138,8 @@ class ControllerApisTest {
 
   private def createControllerApis(authorizer: Option[Authorizer],
                                    controller: Controller,
-                                   props: Properties = new Properties()): 
ControllerApis = {
+                                   props: Properties = new Properties(),
+                                   throttle: Boolean = false): ControllerApis 
= {
     props.put(KafkaConfig.NodeIdProp, nodeId: java.lang.Integer)
     props.put(KafkaConfig.ProcessRolesProp, "controller")
     props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
@@ -102,7 +147,7 @@ class ControllerApisTest {
     new ControllerApis(
       requestChannel,
       authorizer,
-      quotas,
+      if (throttle) quotasAlwaysThrottleControllerMutations else 
quotasNeverThrottleControllerMutations,
       time,
       controller,
       raftManager,
@@ -562,6 +607,34 @@ class ControllerApisTest {
       _ => Set("baz")).get().topics().asScala.toSet)
   }
 
+  @ParameterizedTest(name = "testCreateTopicsMutationQuota with throttle: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testCreateTopicsMutationQuota(throttle: Boolean): Unit = {
+    val controller = new MockController.Builder().build()
+    val controllerApis = createControllerApis(None, controller, new 
Properties(), throttle)
+    val topicName = "foo"
+    val requestData = new CreateTopicsRequestData().setTopics(new 
CreatableTopicCollection(
+      util.Collections.singletonList(new 
CreatableTopic().setName(topicName).setNumPartitions(1).setReplicationFactor(1)).iterator()))
+    val request = new CreateTopicsRequest.Builder(requestData).build()
+    val expectedResponseDataUnthrottled = Set(new 
CreatableTopicResult().setName(topicName).
+        setErrorCode(NONE.code()).
+        setTopicId(new Uuid(0L, 1L)).
+        setNumPartitions(1).
+        setReplicationFactor(1).
+        setTopicConfigErrorCode(NONE.code()))
+    val expectedResponseDataThrottled = Set(new 
CreatableTopicResult().setName(topicName).
+      setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+      setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
+    val response = handleRequest[CreateTopicsResponse](request, controllerApis)
+    if (throttle) {
+      assertEquals(expectedResponseDataThrottled, 
response.data.topics().asScala.toSet)
+      assertEquals(MockControllerMutationQuota.throttleTimeMs, 
response.throttleTimeMs())
+    } else {
+      assertEquals(expectedResponseDataUnthrottled, 
response.data.topics().asScala.toSet)
+      assertEquals(0, response.throttleTimeMs())
+    }
+  }
+
   @Test
   def testDeleteTopicsByName(): Unit = {
     val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
@@ -831,6 +904,32 @@ class ControllerApisTest {
     assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), results.find(_.name 
== "bar").map(result => Errors.forCode(result.errorCode)))
   }
 
+  @ParameterizedTest(name = "testCreatePartitionsMutationQuota with throttle: 
{0}")
+  @ValueSource(booleans = Array(true, false))
+  def testCreatePartitionsMutationQuota(throttle: Boolean): Unit = {
+    val topicName = "foo"
+    val controller = new MockController.Builder()
+      .newInitialTopic(topicName, Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"), 1)
+      .build()
+    val controllerApis = createControllerApis(None, controller, new 
Properties(), throttle)
+    val requestData = new CreatePartitionsRequestData()
+    requestData.topics().add(new 
CreatePartitionsTopic().setName(topicName).setAssignments(null).setCount(2))
+    val request = new CreatePartitionsRequest.Builder(requestData).build()
+    val expectedResponseDataUnthrottled = Set(new 
CreatePartitionsTopicResult().setName(topicName).
+      setErrorCode(NONE.code()))
+    val expectedResponseDataThrottled = Set(new 
CreatePartitionsTopicResult().setName(topicName).
+      setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+      setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
+    val response = handleRequest[CreatePartitionsResponse](request, 
controllerApis)
+    if (throttle) {
+      assertEquals(expectedResponseDataThrottled, 
response.data.results().asScala.toSet)
+      assertEquals(MockControllerMutationQuota.throttleTimeMs, 
response.throttleTimeMs())
+    } else {
+      assertEquals(expectedResponseDataUnthrottled, 
response.data.results().asScala.toSet)
+      assertEquals(0, response.throttleTimeMs())
+    }
+  }
+
   @Test
   def testElectLeadersAuthorization(): Unit = {
     val authorizer = mock(classOf[Authorizer])
@@ -918,6 +1017,35 @@ class ControllerApisTest {
     assertEquals(Errors.NOT_CONTROLLER, 
Errors.forCode(topicIdResponse.errorCode))
   }
 
+  @ParameterizedTest(name = "testDeleteTopicsMutationQuota with throttle: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testDeleteTopicsMutationQuota(throttle: Boolean): Unit = {
+    val topicId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val topicName = "foo"
+    val controller = new MockController.Builder()
+      .newInitialTopic(topicName, topicId, 1)
+      .build()
+    val controllerApis = createControllerApis(None, controller, new 
Properties(), throttle)
+    val requestData = new DeleteTopicsRequestData().setTopics(singletonList(
+      new DeleteTopicState().setTopicId(topicId)))
+    val request = new DeleteTopicsRequest.Builder(requestData).build()
+    val expectedResponseDataUnthrottled = Set(new 
DeletableTopicResult().setName(topicName).
+      setTopicId(topicId).
+      setErrorCode(NONE.code()))
+    val expectedResponseDataThrottled = Set(new 
DeletableTopicResult().setName(topicName).
+      setTopicId(topicId).
+      setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+      setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
+    val response = handleRequest[DeleteTopicsResponse](request, controllerApis)
+    if (throttle) {
+      assertEquals(expectedResponseDataThrottled, 
response.data.responses().asScala.toSet)
+      assertEquals(MockControllerMutationQuota.throttleTimeMs, 
response.throttleTimeMs())
+    } else {
+      assertEquals(expectedResponseDataUnthrottled, 
response.data.responses().asScala.toSet)
+      assertEquals(0, response.throttleTimeMs())
+    }
+  }
+
   @Test
   def testAllocateProducerIdsReturnsNotController(): Unit = {
     val controller = mock(classOf[Controller])
@@ -1007,6 +1135,7 @@ class ControllerApisTest {
 
   @AfterEach
   def tearDown(): Unit = {
-    quotas.shutdown()
+    quotasNeverThrottleControllerMutations.shutdown()
+    quotasAlwaysThrottleControllerMutations.shutdown()
   }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 861ab321818..76617426f97 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -33,8 +33,9 @@ import org.apache.kafka.common.config.{ConfigException, 
SslConfigs}
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.server.authorizer._
+import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.KafkaScheduler
-import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.storage.internals.log.{LogConfig, 
ProducerStateManagerConfig}
 import org.apache.kafka.test.MockMetricsReporter
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
@@ -153,7 +154,7 @@ class DynamicBrokerConfigTest {
     Mockito.when(serverMock.kafkaScheduler).thenReturn(schedulerMock)
 
     config.dynamicConfig.initialize(None)
-    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicThreadPool(serverMock))
+    config.dynamicConfig.addBrokerReconfigurable(new 
BrokerDynamicThreadPool(serverMock))
     config.dynamicConfig.addReconfigurable(acceptorMock)
 
     val props = new Properties()
@@ -445,6 +446,32 @@ class DynamicBrokerConfigTest {
     assertThrows(classOf[ConfigException], () => 
dynamicListenerConfig.validateReconfiguration(KafkaConfig(props)))
   }
 
+  class TestAuthorizer extends Authorizer with Reconfigurable {
+    @volatile var superUsers = ""
+
+    override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ 
<: CompletionStage[Void]] = Map.empty.asJava
+
+    override def authorize(requestContext: AuthorizableRequestContext, 
actions: util.List[Action]): util.List[AuthorizationResult] = null
+
+    override def createAcls(requestContext: AuthorizableRequestContext, 
aclBindings: util.List[AclBinding]): util.List[_ <: 
CompletionStage[AclCreateResult]] = null
+
+    override def deleteAcls(requestContext: AuthorizableRequestContext, 
aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: 
CompletionStage[AclDeleteResult]] = null
+
+    override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = 
null
+
+    override def close(): Unit = {}
+
+    override def configure(configs: util.Map[String, _]): Unit = {}
+
+    override def reconfigurableConfigs(): util.Set[String] = 
Set("super.users").asJava
+
+    override def validateReconfiguration(configs: util.Map[String, _]): Unit = 
{}
+
+    override def reconfigure(configs: util.Map[String, _]): Unit = {
+      superUsers = configs.get("super.users").toString
+    }
+  }
+
   @Test
   def testAuthorizerConfig(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
@@ -452,33 +479,119 @@ class DynamicBrokerConfigTest {
     oldConfig.dynamicConfig.initialize(None)
 
     val kafkaServer: KafkaServer = mock(classOf[kafka.server.KafkaServer])
-
-    class TestAuthorizer extends Authorizer with Reconfigurable {
-      @volatile var superUsers = ""
-      override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, 
_ <: CompletionStage[Void]] = Map.empty.asJava
-      override def authorize(requestContext: AuthorizableRequestContext, 
actions: util.List[Action]): util.List[AuthorizationResult] = null
-      override def createAcls(requestContext: AuthorizableRequestContext, 
aclBindings: util.List[AclBinding]): util.List[_ <: 
CompletionStage[AclCreateResult]] = null
-      override def deleteAcls(requestContext: AuthorizableRequestContext, 
aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: 
CompletionStage[AclDeleteResult]] = null
-      override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = 
null
-      override def close(): Unit = {}
-      override def configure(configs: util.Map[String, _]): Unit = {}
-      override def reconfigurableConfigs(): util.Set[String] = 
Set("super.users").asJava
-      override def validateReconfiguration(configs: util.Map[String, _]): Unit 
= {}
-      override def reconfigure(configs: util.Map[String, _]): Unit = {
-        superUsers = configs.get("super.users").toString
-      }
-    }
+    when(kafkaServer.config).thenReturn(oldConfig)
+    
when(kafkaServer.kafkaYammerMetrics).thenReturn(KafkaYammerMetrics.INSTANCE)
+    val metrics: Metrics = mock(classOf[Metrics])
+    when(kafkaServer.metrics).thenReturn(metrics)
+    val quotaManagers: QuotaFactory.QuotaManagers = 
mock(classOf[QuotaFactory.QuotaManagers])
+    when(quotaManagers.clientQuotaCallback).thenReturn(None)
+    when(kafkaServer.quotaManagers).thenReturn(quotaManagers)
+    val socketServer: SocketServer = mock(classOf[SocketServer])
+    
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
+    when(kafkaServer.socketServer).thenReturn(socketServer)
+    val logManager: LogManager = mock(classOf[LogManager])
+    val producerStateManagerConfig: ProducerStateManagerConfig = 
mock(classOf[ProducerStateManagerConfig])
+    
when(logManager.producerStateManagerConfig).thenReturn(producerStateManagerConfig)
+    when(kafkaServer.logManager).thenReturn(logManager)
 
     val authorizer = new TestAuthorizer
-    when(kafkaServer.config).thenReturn(oldConfig)
     when(kafkaServer.authorizer).thenReturn(Some(authorizer))
-    // We are only testing authorizer reconfiguration, ignore any exceptions 
due to incomplete mock
-    assertThrows(classOf[Throwable], () => 
kafkaServer.config.dynamicConfig.addReconfigurables(kafkaServer))
+
+    kafkaServer.config.dynamicConfig.addReconfigurables(kafkaServer)
     props.put("super.users", "User:admin")
     kafkaServer.config.dynamicConfig.updateBrokerConfig(0, props)
     assertEquals("User:admin", authorizer.superUsers)
   }
 
+  private def createCombinedControllerConfig(
+    nodeId: Int,
+    port: Int
+  ): Properties = {
+    val retval = TestUtils.createBrokerConfig(nodeId,
+      zkConnect = null,
+      enableControlledShutdown = true,
+      enableDeleteTopic = true,
+      port)
+    retval.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+    retval.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    retval.put(KafkaConfig.ListenersProp, 
s"${retval.get(KafkaConfig.ListenersProp)},CONTROLLER://localhost:0")
+    retval.put(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:0")
+    retval
+  }
+
+  @Test
+  def testCombinedControllerAuthorizerConfig(): Unit = {
+    val props = createCombinedControllerConfig(0, 9092)
+    val oldConfig = KafkaConfig.fromProps(props)
+    oldConfig.dynamicConfig.initialize(None)
+
+    val controllerServer: ControllerServer = 
mock(classOf[kafka.server.ControllerServer])
+    when(controllerServer.config).thenReturn(oldConfig)
+    
when(controllerServer.kafkaYammerMetrics).thenReturn(KafkaYammerMetrics.INSTANCE)
+    val metrics: Metrics = mock(classOf[Metrics])
+    when(controllerServer.metrics).thenReturn(metrics)
+    val quotaManagers: QuotaFactory.QuotaManagers = 
mock(classOf[QuotaFactory.QuotaManagers])
+    when(quotaManagers.clientQuotaCallback).thenReturn(None)
+    when(controllerServer.quotaManagers).thenReturn(quotaManagers)
+    val socketServer: SocketServer = mock(classOf[SocketServer])
+    
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
+    when(controllerServer.socketServer).thenReturn(socketServer)
+
+    val authorizer = new TestAuthorizer
+    when(controllerServer.authorizer).thenReturn(Some(authorizer))
+
+    controllerServer.config.dynamicConfig.addReconfigurables(controllerServer)
+    props.put("super.users", "User:admin")
+    controllerServer.config.dynamicConfig.updateBrokerConfig(0, props)
+    assertEquals("User:admin", authorizer.superUsers)
+  }
+
+  private def createIsolatedControllerConfig(
+    nodeId: Int,
+    port: Int
+  ): Properties = {
+    val retval = TestUtils.createBrokerConfig(nodeId,
+      zkConnect = null,
+      enableControlledShutdown = true,
+      enableDeleteTopic = true,
+      port
+    )
+    retval.put(KafkaConfig.ProcessRolesProp, "controller")
+    retval.remove(KafkaConfig.AdvertisedListenersProp)
+
+    retval.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    retval.put(KafkaConfig.ListenersProp, "CONTROLLER://localhost:0")
+    retval.put(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:0")
+    retval
+  }
+
+  @Test
+  def testIsolatedControllerAuthorizerConfig(): Unit = {
+    val props = createIsolatedControllerConfig(0, port = 9092)
+    val oldConfig = KafkaConfig.fromProps(props)
+    oldConfig.dynamicConfig.initialize(None)
+
+    val controllerServer: ControllerServer = 
mock(classOf[kafka.server.ControllerServer])
+    when(controllerServer.config).thenReturn(oldConfig)
+    
when(controllerServer.kafkaYammerMetrics).thenReturn(KafkaYammerMetrics.INSTANCE)
+    val metrics: Metrics = mock(classOf[Metrics])
+    when(controllerServer.metrics).thenReturn(metrics)
+    val quotaManagers: QuotaFactory.QuotaManagers = 
mock(classOf[QuotaFactory.QuotaManagers])
+    when(quotaManagers.clientQuotaCallback).thenReturn(None)
+    when(controllerServer.quotaManagers).thenReturn(quotaManagers)
+    val socketServer: SocketServer = mock(classOf[SocketServer])
+    
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
+    when(controllerServer.socketServer).thenReturn(socketServer)
+
+    val authorizer = new TestAuthorizer
+    when(controllerServer.authorizer).thenReturn(Some(authorizer))
+
+    controllerServer.config.dynamicConfig.addReconfigurables(controllerServer)
+    props.put("super.users", "User:admin")
+    controllerServer.config.dynamicConfig.updateBrokerConfig(0, props)
+    assertEquals("User:admin", authorizer.superUsers)
+  }
+
   @Test
   def testSynonyms(): Unit = {
     assertEquals(List("listener.name.secure.ssl.keystore.type", 
"ssl.keystore.type"),
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
index e4bc2f3eb4a..5585f172646 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
@@ -17,13 +17,14 @@
 
 package org.apache.kafka.controller;
 
-
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
 import org.apache.kafka.common.message.RequestHeaderData;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
 
 import java.util.OptionalLong;
+import java.util.function.Consumer;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -42,19 +43,39 @@ public class ControllerRequestContext {
     private final OptionalLong deadlineNs;
     private final RequestHeaderData requestHeader;
 
+    private final Consumer<Integer> partitionChangeQuotaApplier;
+
     public ControllerRequestContext(
         RequestHeaderData requestHeader,
         KafkaPrincipal principal,
         OptionalLong deadlineNs
+    ) {
+        this(requestHeader, principal, deadlineNs, __ -> { });
+    }
+
+    public ControllerRequestContext(
+        RequestHeaderData requestHeader,
+        KafkaPrincipal principal,
+        OptionalLong deadlineNs,
+        Consumer<Integer> partitionChangeQuotaApplier
     ) {
         this.requestHeader = requestHeader;
         this.principal = principal;
         this.deadlineNs = deadlineNs;
+        this.partitionChangeQuotaApplier = partitionChangeQuotaApplier;
     }
 
     public ControllerRequestContext(
         AuthorizableRequestContext requestContext,
         OptionalLong deadlineNs
+    ) {
+        this(requestContext, deadlineNs, __ -> { });
+    }
+
+    public ControllerRequestContext(
+        AuthorizableRequestContext requestContext,
+        OptionalLong deadlineNs,
+        Consumer<Integer> partitionChangeQuotaApplier
     ) {
         this(
             new RequestHeaderData()
@@ -63,7 +84,8 @@ public class ControllerRequestContext {
                 .setCorrelationId(requestContext.correlationId())
                 .setClientId(requestContext.clientId()),
             requestContext.principal(),
-            deadlineNs
+            deadlineNs,
+            partitionChangeQuotaApplier
         );
     }
 
@@ -78,4 +100,15 @@ public class ControllerRequestContext {
     public OptionalLong deadlineNs() {
         return deadlineNs;
     }
+
+    /**
+     * Apply the partition change quota.
+     *
+     * @param requestedPartitionCount           The value to apply.
+     * @throws ThrottlingQuotaExceededException If recording this value moves 
a metric beyond its configured
+     *                                          maximum or minimum bound
+     */
+    public void applyPartitionChangeQuota(int requestedPartitionCount) {
+        partitionChangeQuotaApplier.accept(requestedPartitionCount);
+    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index ff11211e343..71a1dbc4f72 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1802,7 +1802,7 @@ public final class QuorumController implements Controller 
{
             return CompletableFuture.completedFuture(new 
CreateTopicsResponseData());
         }
         return appendWriteEvent("createTopics", context.deadlineNs(),
-            () -> replicationControl.createTopics(request, describable));
+            () -> replicationControl.createTopics(context, request, 
describable));
     }
 
     @Override
@@ -1852,7 +1852,7 @@ public final class QuorumController implements Controller 
{
         if (ids.isEmpty())
             return CompletableFuture.completedFuture(Collections.emptyMap());
         return appendWriteEvent("deleteTopics", context.deadlineNs(),
-            () -> replicationControl.deleteTopics(ids));
+            () -> replicationControl.deleteTopics(context, ids));
     }
 
     @Override
@@ -2073,7 +2073,7 @@ public final class QuorumController implements Controller 
{
         }
 
         return appendWriteEvent("createPartitions", context.deadlineNs(), () 
-> {
-            final ControllerResult<List<CreatePartitionsTopicResult>> result = 
replicationControl.createPartitions(topics);
+            final ControllerResult<List<CreatePartitionsTopicResult>> result = 
replicationControl.createPartitions(context, topics);
             if (validateOnly) {
                 log.debug("Validate-only CreatePartitions result(s): {}", 
result.response());
                 return result.withoutRecords();
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 23113e64559..2b21d5c0b22 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.NoReassignmentInProgressException;
 import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -522,8 +523,11 @@ public class ReplicationControlManager {
         log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
     }
 
-    ControllerResult<CreateTopicsResponseData>
-            createTopics(CreateTopicsRequestData request, Set<String> 
describable) {
+    ControllerResult<CreateTopicsResponseData> createTopics(
+        ControllerRequestContext context,
+        CreateTopicsRequestData request,
+        Set<String> describable
+    ) {
         Map<String, ApiError> topicErrors = new HashMap<>();
         List<ApiMessageAndVersion> records = new ArrayList<>();
 
@@ -562,7 +566,7 @@ public class ReplicationControlManager {
             }
             ApiError error;
             try {
-                error = createTopic(topic, records, successes, configRecords, 
describable.contains(topic.name()));
+                error = createTopic(context, topic, records, successes, 
configRecords, describable.contains(topic.name()));
             } catch (ApiException e) {
                 error = ApiError.fromThrowable(e);
             }
@@ -602,7 +606,8 @@ public class ReplicationControlManager {
         }
     }
 
-    private ApiError createTopic(CreatableTopic topic,
+    private ApiError createTopic(ControllerRequestContext context,
+                                 CreatableTopic topic,
                                  List<ApiMessageAndVersion> records,
                                  Map<String, CreatableTopicResult> successes,
                                  List<ApiMessageAndVersion> configRecords,
@@ -704,6 +709,14 @@ public class ReplicationControlManager {
                 topic.name(), numPartitions, replicationFactor, null, 
creationConfigs));
             if (error.isFailure()) return error;
         }
+        int numPartitions = newParts.size();
+        try {
+            context.applyPartitionChangeQuota(numPartitions); // check 
controller mutation quota
+        } catch (ThrottlingQuotaExceededException e) {
+            log.debug("Topic creation of {} partitions not allowed because 
quota is violated. Delay time: {}",
+                numPartitions, e.throttleTimeMs());
+            return ApiError.fromThrowable(e);
+        }
         Uuid topicId = Uuid.randomUuid();
         CreatableTopicResult result = new CreatableTopicResult().
             setName(topic.name()).
@@ -724,7 +737,7 @@ public class ReplicationControlManager {
                     
setConfigSource(KafkaConfigSchema.translateConfigSource(entry.source()).id()).
                     setIsSensitive(entry.isSensitive()));
             }
-            result.setNumPartitions(newParts.size());
+            result.setNumPartitions(numPartitions);
             result.setReplicationFactor((short) 
newParts.values().iterator().next().replicas.length);
             result.setTopicConfigErrorCode(NONE.code());
         } else {
@@ -847,12 +860,12 @@ public class ReplicationControlManager {
         return results;
     }
 
-    ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+    ControllerResult<Map<Uuid, ApiError>> 
deleteTopics(ControllerRequestContext context, Collection<Uuid> ids) {
         Map<Uuid, ApiError> results = new HashMap<>(ids.size());
         List<ApiMessageAndVersion> records = new ArrayList<>(ids.size());
         for (Uuid id : ids) {
             try {
-                deleteTopic(id, records);
+                deleteTopic(context, id, records);
                 results.put(id, ApiError.NONE);
             } catch (ApiException e) {
                 results.put(id, ApiError.fromThrowable(e));
@@ -864,11 +877,20 @@ public class ReplicationControlManager {
         return ControllerResult.atomicOf(records, results);
     }
 
-    void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {
+    void deleteTopic(ControllerRequestContext context, Uuid id, 
List<ApiMessageAndVersion> records) {
         TopicControlInfo topic = topics.get(id);
         if (topic == null) {
             throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message());
         }
+        int numPartitions = topic.parts.size();
+        try {
+            context.applyPartitionChangeQuota(numPartitions); // check 
controller mutation quota
+        } catch (ThrottlingQuotaExceededException e) {
+            // log a message and rethrow the exception
+            log.debug("Topic deletion of {} partitions not allowed because 
quota is violated. Delay time: {}",
+                numPartitions, e.throttleTimeMs());
+            throw e;
+        }
         records.add(new ApiMessageAndVersion(new RemoveTopicRecord().
             setTopicId(id), (short) 0));
     }
@@ -1457,14 +1479,16 @@ public class ReplicationControlManager {
         return ControllerResult.of(records, rescheduleImmidiately);
     }
 
-    ControllerResult<List<CreatePartitionsTopicResult>>
-            createPartitions(List<CreatePartitionsTopic> topics) {
+    ControllerResult<List<CreatePartitionsTopicResult>> createPartitions(
+        ControllerRequestContext context,
+        List<CreatePartitionsTopic> topics
+    ) {
         List<ApiMessageAndVersion> records = new ArrayList<>();
         List<CreatePartitionsTopicResult> results = new ArrayList<>();
         for (CreatePartitionsTopic topic : topics) {
             ApiError apiError = ApiError.NONE;
             try {
-                createPartitions(topic, records);
+                createPartitions(context, topic, records);
             } catch (ApiException e) {
                 apiError = ApiError.fromThrowable(e);
             } catch (Exception e) {
@@ -1479,7 +1503,8 @@ public class ReplicationControlManager {
         return ControllerResult.atomicOf(records, results);
     }
 
-    void createPartitions(CreatePartitionsTopic topic,
+    void createPartitions(ControllerRequestContext context,
+                          CreatePartitionsTopic topic,
                           List<ApiMessageAndVersion> records) {
         Uuid topicId = topicsByName.get(topic.name());
         if (topicId == null) {
@@ -1505,6 +1530,14 @@ public class ReplicationControlManager {
                     " assignment(s) were specified.");
             }
         }
+        try {
+            context.applyPartitionChangeQuota(additional); // check controller 
mutation quota
+        } catch (ThrottlingQuotaExceededException e) {
+            // log a message and rethrow the exception
+            log.debug("Partition creation of {} partitions not allowed because 
quota is violated. Delay time: {}",
+                additional, e.throttleTimeMs());
+            throw e;
+        }
         Iterator<PartitionRegistration> iterator = 
topicInfo.parts.values().iterator();
         if (!iterator.hasNext()) {
             throw new UnknownServerException("Invalid state: topic " + 
topic.name() +
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ControllerRequestContextUtil.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ControllerRequestContextUtil.java
index 8d70a2d82f5..21153ae7a98 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ControllerRequestContextUtil.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ControllerRequestContextUtil.java
@@ -18,6 +18,9 @@
 package org.apache.kafka.controller;
 
 import java.util.OptionalLong;
+import java.util.function.Consumer;
+
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
 import org.apache.kafka.common.message.RequestHeaderData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -28,21 +31,37 @@ public class ControllerRequestContextUtil {
             new RequestHeaderData(),
             KafkaPrincipal.ANONYMOUS,
             OptionalLong.empty());
+    public static final String QUOTA_EXCEEDED_IN_TEST_MSG = "Quota exceeded in 
test";
 
     public static ControllerRequestContext anonymousContextFor(ApiKeys 
apiKeys) {
-        return anonymousContextFor(apiKeys, apiKeys.latestVersion());
+        return anonymousContextFor(apiKeys, apiKeys.latestVersion(), __ -> { 
});
+    }
+
+    public static ControllerRequestContext 
anonymousContextWithMutationQuotaExceededFor(ApiKeys apiKeys) {
+        return anonymousContextFor(apiKeys, apiKeys.latestVersion(), x -> {
+            throw new 
ThrottlingQuotaExceededException(QUOTA_EXCEEDED_IN_TEST_MSG);
+        });
     }
 
     public static ControllerRequestContext anonymousContextFor(
         ApiKeys apiKeys,
         short version
+    ) {
+        return anonymousContextFor(apiKeys, version, __ -> { });
+    }
+
+    public static ControllerRequestContext anonymousContextFor(
+        ApiKeys apiKeys,
+        short version,
+        Consumer<Integer> partitionChangeQuotaApplier
     ) {
         return new ControllerRequestContext(
             new RequestHeaderData()
                 .setRequestApiKey(apiKeys.id)
                 .setRequestApiVersion(version),
             KafkaPrincipal.ANONYMOUS,
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            partitionChangeQuotaApplier
         );
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 76f8ffb4548..975062fbde7 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -128,9 +128,12 @@ import static 
org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRES
 import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
 import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION;
 import static 
org.apache.kafka.common.protocol.Errors.PREFERRED_LEADER_NOT_AVAILABLE;
+import static 
org.apache.kafka.common.protocol.Errors.THROTTLING_QUOTA_EXCEEDED;
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
 import static 
org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
+import static 
org.apache.kafka.controller.ControllerRequestContextUtil.QUOTA_EXCEEDED_IN_TEST_MSG;
 import static 
org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor;
+import static 
org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor;
 import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -219,8 +222,9 @@ public class ReplicationControlManagerTest {
             CreatableTopic topic = new CreatableTopic().setName(name);
             
topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor);
             request.topics().add(topic);
+            ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
             ControllerResult<CreateTopicsResponseData> result =
-                replicationControl.createTopics(request, 
Collections.singleton(name));
+                replicationControl.createTopics(requestContext, request, 
Collections.singleton(name));
             CreatableTopicResult topicResult = 
result.response().topics().find(name);
             assertNotNull(topicResult);
             assertEquals(expectedErrorCode, topicResult.errorCode());
@@ -254,8 +258,9 @@ public class ReplicationControlManagerTest {
                 new 
CreateTopicsRequestData.CreateableTopicConfig().setName(e.getKey()).
                     setValue(e.getValue())));
             request.topics().add(topic);
+            ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
             ControllerResult<CreateTopicsResponseData> result =
-                replicationControl.createTopics(request, 
Collections.singleton(name));
+                replicationControl.createTopics(requestContext, request, 
Collections.singleton(name));
             CreatableTopicResult topicResult = 
result.response().topics().find(name);
             assertNotNull(topicResult);
             assertEquals(expectedErrorCode, topicResult.errorCode());
@@ -267,8 +272,8 @@ public class ReplicationControlManagerTest {
             return topicResult;
         }
 
-        void deleteTopic(Uuid topicId) throws Exception {
-            ControllerResult<Map<Uuid, ApiError>> result = 
replicationControl.deleteTopics(Collections.singleton(topicId));
+        void deleteTopic(ControllerRequestContext context, Uuid topicId) 
throws Exception {
+            ControllerResult<Map<Uuid, ApiError>> result = 
replicationControl.deleteTopics(context, Collections.singleton(topicId));
             assertEquals(Collections.singleton(topicId), 
result.response().keySet());
             assertEquals(NONE, result.response().get(topicId).error());
             assertEquals(1, result.records().size());
@@ -292,8 +297,9 @@ public class ReplicationControlManagerTest {
                 topic.assignments().add(new CreatePartitionsAssignment().
                     setBrokerIds(Replicas.toList(replicas[i])));
             }
+            ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
             ControllerResult<List<CreatePartitionsTopicResult>> result =
-                
replicationControl.createPartitions(Collections.singletonList(topic));
+                replicationControl.createPartitions(requestContext, 
Collections.singletonList(topic));
             assertEquals(1, result.response().size());
             CreatePartitionsTopicResult topicResult = result.response().get(0);
             assertEquals(name, topicResult.name());
@@ -471,8 +477,9 @@ public class ReplicationControlManagerTest {
         request.topics().add(new CreatableTopic().setName("foo").
             setNumPartitions(-1).setReplicationFactor((short) -1));
 
+        ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> result =
-            replicationControl.createTopics(request, 
Collections.singleton("foo"));
+            replicationControl.createTopics(requestContext, request, 
Collections.singleton("foo"));
         CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
         expectedResponse.topics().add(new 
CreatableTopicResult().setName("foo").
             setErrorCode(INVALID_REPLICATION_FACTOR.code()).
@@ -485,7 +492,7 @@ public class ReplicationControlManagerTest {
         ctx.inControlledShutdownBrokers(0);
 
         ControllerResult<CreateTopicsResponseData> result2 =
-            replicationControl.createTopics(request, 
Collections.singleton("foo"));
+            replicationControl.createTopics(requestContext, request, 
Collections.singleton("foo"));
         CreateTopicsResponseData expectedResponse2 = new 
CreateTopicsResponseData();
         expectedResponse2.topics().add(new 
CreatableTopicResult().setName("foo").
             setErrorCode(INVALID_REPLICATION_FACTOR.code()).
@@ -497,7 +504,7 @@ public class ReplicationControlManagerTest {
         ctx.unfenceBrokers(0, 1, 2);
 
         ControllerResult<CreateTopicsResponseData> result3 =
-            replicationControl.createTopics(request, 
Collections.singleton("foo"));
+            replicationControl.createTopics(requestContext, request, 
Collections.singleton("foo"));
         CreateTopicsResponseData expectedResponse3 = new 
CreateTopicsResponseData();
         expectedResponse3.topics().add(new 
CreatableTopicResult().setName("foo").
             setNumPartitions(1).setReplicationFactor((short) 3).
@@ -510,7 +517,7 @@ public class ReplicationControlManagerTest {
             replicationControl.getPartition(
                 ((TopicRecord) result3.records().get(0).message()).topicId(), 
0));
         ControllerResult<CreateTopicsResponseData> result4 =
-                replicationControl.createTopics(request, 
Collections.singleton("foo"));
+                replicationControl.createTopics(requestContext, request, 
Collections.singleton("foo"));
         CreateTopicsResponseData expectedResponse4 = new 
CreateTopicsResponseData();
         expectedResponse4.topics().add(new 
CreatableTopicResult().setName("foo").
                 setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).
@@ -518,6 +525,26 @@ public class ReplicationControlManagerTest {
         assertEquals(expectedResponse4, result4.response());
     }
 
+    @Test
+    public void testCreateTopicsWithMutationQuotaExceeded() throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
+        CreateTopicsRequestData request = new CreateTopicsRequestData();
+        request.topics().add(new CreatableTopic().setName("foo").
+            setNumPartitions(-1).setReplicationFactor((short) -1));
+        ctx.registerBrokers(0, 1, 2);
+        ctx.unfenceBrokers(0, 1, 2);
+        ControllerRequestContext requestContext =
+            
anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS);
+        ControllerResult<CreateTopicsResponseData> result =
+            replicationControl.createTopics(requestContext, request, 
Collections.singleton("foo"));
+        CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
+        expectedResponse.topics().add(new 
CreatableTopicResult().setName("foo").
+            setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+            setErrorMessage(QUOTA_EXCEEDED_IN_TEST_MSG));
+        assertEquals(expectedResponse, result.response());
+    }
+
     @Test
     public void testCreateTopicsISRInvariants() throws Exception {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
@@ -531,8 +558,9 @@ public class ReplicationControlManagerTest {
         ctx.unfenceBrokers(0, 1);
         ctx.inControlledShutdownBrokers(1);
 
+        ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> result =
-            replicationControl.createTopics(request, 
Collections.singleton("foo"));
+            replicationControl.createTopics(requestContext, request, 
Collections.singleton("foo"));
 
         CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
         expectedResponse.topics().add(new 
CreatableTopicResult().setName("foo").
@@ -577,8 +605,9 @@ public class ReplicationControlManagerTest {
             .setNumPartitions(-1).setReplicationFactor((short) -1)
             .setConfigs(validConfigs));
 
+        ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> result1 =
-            replicationControl.createTopics(request1, 
Collections.singleton("foo"));
+            replicationControl.createTopics(requestContext, request1, 
Collections.singleton("foo"));
         assertEquals((short) 0, 
result1.response().topics().find("foo").errorCode());
 
         List<ApiMessageAndVersion> records1 = result1.records();
@@ -611,7 +640,7 @@ public class ReplicationControlManagerTest {
             .setConfigs(invalidConfigs));
 
         ControllerResult<CreateTopicsResponseData> result2 =
-            replicationControl.createTopics(request2, 
Collections.singleton("bar"));
+            replicationControl.createTopics(requestContext, request2, 
Collections.singleton("bar"));
         assertEquals(Errors.INVALID_CONFIG.code(), 
result2.response().topics().find("bar").errorCode());
         assertEquals(
             "Null value not supported for topic configs: foo",
@@ -624,7 +653,7 @@ public class ReplicationControlManagerTest {
             .setConfigs(validConfigs));
 
         ControllerResult<CreateTopicsResponseData> result3 =
-            replicationControl.createTopics(request3, 
Collections.singleton("baz"));
+            replicationControl.createTopics(requestContext, request3, 
Collections.singleton("baz"));
         assertEquals(INVALID_REPLICATION_FACTOR.code(), 
result3.response().topics().find("baz").errorCode());
         assertEquals(Collections.emptyList(), result3.records());
 
@@ -643,7 +672,7 @@ public class ReplicationControlManagerTest {
         request4Topics.add(batchedTopic1);
         request4Topics.add(batchedTopic2);
         ControllerResult<CreateTopicsResponseData> result4 =
-            replicationControl.createTopics(request4, request4Topics);
+            replicationControl.createTopics(requestContext, request4, 
request4Topics);
 
         assertEquals(Errors.NONE.code(), 
result4.response().topics().find(batchedTopic1).errorCode());
         assertEquals(INVALID_REPLICATION_FACTOR.code(), 
result4.response().topics().find(batchedTopic2).errorCode());
@@ -662,19 +691,27 @@ public class ReplicationControlManagerTest {
         assertEquals(batchedTopic1Record.topicId(), ((PartitionRecord) 
result4.records().get(2).message()).topicId());
     }
 
-    @Test
-    public void testCreateTopicsWithValidateOnlyFlag() throws Exception {
+    @ParameterizedTest(name = "testCreateTopicsWithValidateOnlyFlag with 
mutationQuotaExceeded: {0}")
+    @ValueSource(booleans = {true, false})
+    public void testCreateTopicsWithValidateOnlyFlag(boolean 
mutationQuotaExceeded) throws Exception {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
         ctx.registerBrokers(0, 1, 2);
         ctx.unfenceBrokers(0, 1, 2);
         CreateTopicsRequestData request = new 
CreateTopicsRequestData().setValidateOnly(true);
         request.topics().add(new CreatableTopic().setName("foo").
             setNumPartitions(1).setReplicationFactor((short) 3));
+        ControllerRequestContext requestContext = mutationQuotaExceeded ?
+            
anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS) :
+            anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> result =
-            ctx.replicationControl.createTopics(request, 
Collections.singleton("foo"));
+            ctx.replicationControl.createTopics(requestContext, request, 
Collections.singleton("foo"));
         assertEquals(0, result.records().size());
         CreatableTopicResult topicResult = 
result.response().topics().find("foo");
-        assertEquals((short) 0, topicResult.errorCode());
+        if (mutationQuotaExceeded) {
+            assertEquals(THROTTLING_QUOTA_EXCEEDED.code(), 
topicResult.errorCode());
+        } else {
+            assertEquals(NONE.code(), topicResult.errorCode());
+        }
     }
 
     @Test
@@ -685,8 +722,9 @@ public class ReplicationControlManagerTest {
         CreateTopicsRequestData request = new 
CreateTopicsRequestData().setValidateOnly(true);
         request.topics().add(new CreatableTopic().setName("foo").
             setNumPartitions(1).setReplicationFactor((short) 4));
+        ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> result =
-            ctx.replicationControl.createTopics(request, 
Collections.singleton("foo"));
+            ctx.replicationControl.createTopics(requestContext, request, 
Collections.singleton("foo"));
         assertEquals(0, result.records().size());
         CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
         expectedResponse.topics().add(new 
CreatableTopicResult().setName("foo").
@@ -728,7 +766,8 @@ public class ReplicationControlManagerTest {
 
         CreatableTopicResult initialTopic = ctx.createTestTopic("foo.bar", 2, 
(short) 2, NONE.code());
         assertEquals(2, 
ctx.replicationControl.getTopic(initialTopic.topicId()).numPartitions(Long.MAX_VALUE));
-        ctx.deleteTopic(initialTopic.topicId());
+        ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.DELETE_TOPICS);
+        ctx.deleteTopic(requestContext, initialTopic.topicId());
 
         CreatableTopicResult recreatedTopic = ctx.createTestTopic("foo.bar", 
4, (short) 2, NONE.code());
         assertNotEquals(initialTopic.topicId(), recreatedTopic.topicId());
@@ -1050,8 +1089,9 @@ public class ReplicationControlManagerTest {
             setConfigs(requestConfigs));
         ctx.registerBrokers(0, 1);
         ctx.unfenceBrokers(0, 1);
+        ControllerRequestContext createTopicsRequestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> createResult =
-            replicationControl.createTopics(request, 
Collections.singleton("foo"));
+            replicationControl.createTopics(createTopicsRequestContext, 
request, Collections.singleton("foo"));
         CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
         Uuid topicId = createResult.response().topics().find("foo").topicId();
         expectedResponse.topics().add(new 
CreatableTopicResult().setName("foo").
@@ -1082,13 +1122,14 @@ public class ReplicationControlManagerTest {
             new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))),
                 replicationControl.findTopicIds(Long.MAX_VALUE, 
Collections.singleton("bar")));
 
+        ControllerRequestContext deleteTopicsRequestContext = 
anonymousContextFor(ApiKeys.DELETE_TOPICS);
         ControllerResult<Map<Uuid, ApiError>> invalidDeleteResult = 
replicationControl.
-            deleteTopics(Collections.singletonList(invalidId));
+            deleteTopics(deleteTopicsRequestContext, 
Collections.singletonList(invalidId));
         assertEquals(0, invalidDeleteResult.records().size());
         assertEquals(singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, 
null)),
             invalidDeleteResult.response());
         ControllerResult<Map<Uuid, ApiError>> deleteResult = 
replicationControl.
-            deleteTopics(Collections.singletonList(topicId));
+            deleteTopics(deleteTopicsRequestContext, 
Collections.singletonList(topicId));
         assertTrue(deleteResult.isAtomic());
         assertEquals(singletonMap(topicId, new ApiError(NONE, null)),
             deleteResult.response());
@@ -1107,6 +1148,33 @@ public class ReplicationControlManagerTest {
         assertEmptyTopicConfigs(ctx, "foo");
     }
 
+    @Test
+    public void testDeleteTopicsWithMutationQuotaExceeded() throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
+        CreateTopicsRequestData request = new CreateTopicsRequestData();
+        request.topics().add(new CreatableTopic().setName("foo").
+            setNumPartitions(3).setReplicationFactor((short) 2));
+        ctx.registerBrokers(0, 1);
+        ctx.unfenceBrokers(0, 1);
+        ControllerRequestContext createTopicsRequestContext =
+            anonymousContextFor(ApiKeys.CREATE_TOPICS);
+        ControllerResult<CreateTopicsResponseData> createResult =
+            replicationControl.createTopics(createTopicsRequestContext, 
request, Collections.singleton("foo"));
+        CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
+        CreatableTopicResult createdTopic = 
createResult.response().topics().find("foo");
+        assertEquals(NONE.code(), createdTopic.errorCode());
+        ctx.replay(createResult.records());
+        ControllerRequestContext deleteTopicsRequestContext =
+            
anonymousContextWithMutationQuotaExceededFor(ApiKeys.DELETE_TOPICS);
+        Uuid topicId = createdTopic.topicId();
+        ControllerResult<Map<Uuid, ApiError>> deleteResult = 
replicationControl.
+            deleteTopics(deleteTopicsRequestContext, 
Collections.singletonList(topicId));
+        assertEquals(singletonMap(topicId, new 
ApiError(THROTTLING_QUOTA_EXCEEDED, QUOTA_EXCEEDED_IN_TEST_MSG)),
+            deleteResult.response());
+        assertEquals(0, deleteResult.records().size());
+    }
+
     @Test
     public void testCreatePartitions() throws Exception {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
@@ -1122,8 +1190,9 @@ public class ReplicationControlManagerTest {
             setNumPartitions(2).setReplicationFactor((short) 2));
         ctx.registerBrokers(0, 1);
         ctx.unfenceBrokers(0, 1);
+        ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> createTopicResult = 
replicationControl.
-            createTopics(request, new HashSet<>(Arrays.asList("foo", "bar", 
"quux", "foo2")));
+            createTopics(requestContext, request, new 
HashSet<>(Arrays.asList("foo", "bar", "quux", "foo2")));
         ctx.replay(createTopicResult.records());
         List<CreatePartitionsTopic> topics = new ArrayList<>();
         topics.add(new CreatePartitionsTopic().
@@ -1135,7 +1204,7 @@ public class ReplicationControlManagerTest {
         topics.add(new CreatePartitionsTopic().
             setName("quux").setCount(2).setAssignments(null));
         ControllerResult<List<CreatePartitionsTopicResult>> 
createPartitionsResult =
-            replicationControl.createPartitions(topics);
+            replicationControl.createPartitions(requestContext, topics);
         assertEquals(asList(new CreatePartitionsTopicResult().
                 setName("foo").
                 setErrorCode(NONE.code()).
@@ -1168,7 +1237,7 @@ public class ReplicationControlManagerTest {
             setName("foo2").setCount(3).setAssignments(asList(
             new CreatePartitionsAssignment().setBrokerIds(asList(2, 0)))));
         ControllerResult<List<CreatePartitionsTopicResult>> 
createPartitionsResult2 =
-            replicationControl.createPartitions(topics2);
+            replicationControl.createPartitions(requestContext, topics2);
         assertEquals(asList(new CreatePartitionsTopicResult().
                 setName("foo").
                 setErrorCode(NONE.code()).
@@ -1192,6 +1261,45 @@ public class ReplicationControlManagerTest {
         ctx.replay(createPartitionsResult2.records());
     }
 
+    @Test
+    public void testCreatePartitionsWithMutationQuotaExceeded() throws 
Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
+        CreateTopicsRequestData request = new CreateTopicsRequestData();
+        request.topics().add(new CreatableTopic().setName("foo").
+            setNumPartitions(3).setReplicationFactor((short) 2));
+        ctx.registerBrokers(0, 1);
+        ctx.unfenceBrokers(0, 1);
+        ControllerRequestContext createTopicsRequestContext =
+            anonymousContextFor(ApiKeys.CREATE_TOPICS);
+        ControllerResult<CreateTopicsResponseData> createResult =
+            replicationControl.createTopics(createTopicsRequestContext, 
request, Collections.singleton("foo"));
+        CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
+        CreatableTopicResult createdTopic = 
createResult.response().topics().find("foo");
+        assertEquals(NONE.code(), createdTopic.errorCode());
+        ctx.replay(createResult.records());
+        List<CreatePartitionsTopic> topics = new ArrayList<>();
+        topics.add(new CreatePartitionsTopic().
+            setName("foo").setCount(5).setAssignments(null));
+        ControllerRequestContext createPartitionsRequestContext =
+            
anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_PARTITIONS);
+        ControllerResult<List<CreatePartitionsTopicResult>> 
createPartitionsResult =
+            
replicationControl.createPartitions(createPartitionsRequestContext, topics);
+        List<CreatePartitionsTopicResult> expectedThrottled = 
singletonList(new CreatePartitionsTopicResult().
+            setName("foo").
+            setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+            setErrorMessage(QUOTA_EXCEEDED_IN_TEST_MSG));
+        assertEquals(expectedThrottled, createPartitionsResult.response());
+        // now test the explicit assignment case
+        List<CreatePartitionsTopic> topics2 = new ArrayList<>();
+        topics2.add(new CreatePartitionsTopic().
+            setName("foo").setCount(4).setAssignments(asList(
+                new CreatePartitionsAssignment().setBrokerIds(asList(1, 0)))));
+        ControllerResult<List<CreatePartitionsTopicResult>> 
createPartitionsResult2 =
+            
replicationControl.createPartitions(createPartitionsRequestContext, topics2);
+        assertEquals(expectedThrottled, createPartitionsResult2.response());
+    }
+
     @Test
     public void 
testCreatePartitionsFailsWhenAllBrokersAreFencedOrInControlledShutdown() throws 
Exception {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
@@ -1203,8 +1311,10 @@ public class ReplicationControlManagerTest {
         ctx.registerBrokers(0, 1);
         ctx.unfenceBrokers(0, 1);
 
+        ControllerRequestContext requestContext =
+                anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> createTopicResult = 
replicationControl.
-            createTopics(request, new HashSet<>(Arrays.asList("foo")));
+            createTopics(requestContext, request, new 
HashSet<>(Arrays.asList("foo")));
         ctx.replay(createTopicResult.records());
 
         ctx.registerBrokers(0, 1);
@@ -1215,7 +1325,7 @@ public class ReplicationControlManagerTest {
         topics.add(new CreatePartitionsTopic().
             setName("foo").setCount(2).setAssignments(null));
         ControllerResult<List<CreatePartitionsTopicResult>> 
createPartitionsResult =
-            replicationControl.createPartitions(topics);
+            replicationControl.createPartitions(requestContext, topics);
 
         assertEquals(
             asList(new CreatePartitionsTopicResult().
@@ -1239,15 +1349,16 @@ public class ReplicationControlManagerTest {
         ctx.unfenceBrokers(0, 1);
         ctx.inControlledShutdownBrokers(1);
 
+        ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> result =
-            replicationControl.createTopics(request, 
Collections.singleton("foo"));
+            replicationControl.createTopics(requestContext, request, 
Collections.singleton("foo"));
         ctx.replay(result.records());
 
         List<CreatePartitionsTopic> topics = asList(new 
CreatePartitionsTopic().
             setName("foo").setCount(2).setAssignments(null));
 
         ControllerResult<List<CreatePartitionsTopicResult>> 
createPartitionsResult =
-            replicationControl.createPartitions(topics);
+            replicationControl.createPartitions(requestContext, topics);
         ctx.replay(createPartitionsResult.records());
 
         // Broker 2 cannot be in the ISR because it is fenced and broker 1
diff --git a/tests/kafkatest/tests/core/controller_mutation_quota_test.py 
b/tests/kafkatest/tests/core/controller_mutation_quota_test.py
new file mode 100644
index 00000000000..b5578ce857e
--- /dev/null
+++ b/tests/kafkatest/tests/core/controller_mutation_quota_test.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark.resource import cluster
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+
+from kafkatest.services.trogdor.produce_bench_workload import 
ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
+from kafkatest.services.trogdor.consume_bench_workload import 
ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.trogdor.trogdor import TrogdorService
+from kafkatest.services.zookeeper import ZookeeperService
+
+import time
+
+
+class ControllerMutationQuotaTest(Test):
+    """Tests throttled partition changes via the kafka-topics CLI as follows:
+
+    1. Create a topic with N partitions (creating N partitions)
+    2. Alter the number of topic partitons to 2N (adding N partitions)
+    3. Delete topic (removing 2N partitions)
+    3. Create topic with 1 partition
+
+    The mutation quota should be checked at the beginning of each operation,
+    so the checks are expected to be as follows assuming 1 window for the 
entire set of operations:
+
+    Before step 2: alter will be allowed if quota > N
+    Before step 3: delete will be allowed if quota > 2N
+    Before step 4: create will be allowed if quota > 4N
+
+    Therefore, if we want steps 1-3 to succeed and step 4 to fail we need the 
2N < quota < 4N
+
+    For example, we can achieve the desired behavior (steps 1-3 succeed, step 
4 fails)
+    for N = 10 partitions by setting the following configs:
+
+    controller.quota.window.num=10
+    controller.quota.window.size.seconds=200
+    controller_mutation_rate=X where (20 < 2000X < 40), therefore (0.01 < X < 
0.02)
+    """
+    def __init__(self, test_context):
+        super(ControllerMutationQuotaTest, 
self).__init__(test_context=test_context)
+        self.test_context = test_context
+        self.zk = ZookeeperService(test_context, num_nodes=1) if 
quorum.for_test(test_context) == quorum.zk else None
+        self.window_num = 10
+        self.window_size_seconds = 200 # must be long enough such that all CLI 
commands fit into it
+
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+            server_prop_overrides=[
+                ["quota.window.num", "%s" % self.window_num],
+                ["controller.quota.window.size.seconds", "%s" % 
self.window_size_seconds]
+            ],
+            controller_num_nodes_override=1)
+
+    def setUp(self):
+        if self.zk:
+            self.zk.start()
+        self.kafka.start()
+
+    def teardown(self):
+        # Need to increase the timeout due to partition count
+        self.kafka.stop()
+        if self.zk:
+            self.zk.stop()
+
+    @cluster(num_nodes=2)
+    @matrix(metadata_quorum=quorum.all)
+    def test_controller_mutation_quota(self, metadata_quorum=quorum.zk):
+        self.partition_count = 10
+        mutation_rate = 3 * self.partition_count / (self.window_num * 
self.window_size_seconds)
+
+        # first invoke the steps with no mutation quota to ensure it succeeds 
in general
+        self.mutate_partitions("topic_succeed", True)
+
+        # now apply the mutation quota configs
+        node = self.kafka.nodes[0]
+        alter_mutation_quota_cmd = "%s --entity-type users --entity-default 
--alter --add-config 'controller_mutation_rate=%f'" % \
+               
(self.kafka.kafka_configs_cmd_with_optional_security_settings(node, 
force_use_zk_connection=False), mutation_rate)
+        node.account.ssh(alter_mutation_quota_cmd)
+
+        # now invoke the same steps with the mutation quota applied to ensure 
it fails
+        self.mutate_partitions("topic_fail", False)
+
+    def mutate_partitions(self, topic_prefix, expected_to_succeed):
+        try:
+            # create the topic
+            topic_name = topic_prefix
+            topic_cfg = {
+                "topic": topic_name,
+                "partitions": self.partition_count,
+                "replication-factor": 1,
+            }
+            self.kafka.create_topic(topic_cfg)
+
+            # alter the partition count
+            node = self.kafka.nodes[0]
+            alter_topic_cmd = "%s --alter --topic %s --partitions %i" % \
+                   
(self.kafka.kafka_topics_cmd_with_optional_security_settings(node, 
force_use_zk_connection=False),
+                   topic_name, 2 * self.partition_count)
+            node.account.ssh(alter_topic_cmd)
+
+            # delete the topic
+            self.kafka.delete_topic(topic_name)
+        except RuntimeError as e:
+            fail_msg = "Failure: Unexpected Exception - %s" % str(e)
+            self.logger.error(fail_msg)
+            raise RuntimeError(fail_msg)
+
+        # create a second topic with a single partition
+        topic_cfg = {
+            "topic": "%s_b" % topic_prefix,
+            "partitions": 1,
+            "replication-factor": 1,
+        }
+        fail_msg = ""
+        try:
+            self.kafka.create_topic(topic_cfg)
+            if not expected_to_succeed:
+                fail_msg = "Failure: Did not encounter controller mutation 
quota violation when one was expected"
+            else:
+                self.logger.info("Success: Encountered no quota violation as 
expected")
+        except RuntimeError as e:
+            if expected_to_succeed:
+                fail_msg = "Failure: Unexpected Exception - %s" % str(e)
+            else:
+                self.logger.info("Success: Encountered quota violation 
exception as expected")
+        if fail_msg:
+            self.logger.error(fail_msg)
+            raise RuntimeError(fail_msg)
\ No newline at end of file

Reply via email to