This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06185618b16 KAFKA-19817 Move DynamicTopicClusterQuotaPublisher to 
metadata module (#20783)
06185618b16 is described below

commit 06185618b169274914c27d7bcb46a5fa8c1c54f6
Author: jimmy <[email protected]>
AuthorDate: Sun Jan 18 01:11:35 2026 +0800

    KAFKA-19817 Move DynamicTopicClusterQuotaPublisher to metadata module 
(#20783)
    
    see: [KAFKA-19817](https://issues.apache.org/jira/browse/KAFKA-19817)
    This PR intended to move `DynamicTopicClusterQuotaPublisher`  to the
    metadata module . To prevent circular dependencies, several associated
    classes have been migrated as well,including `ClientSensors` ,
    `ClientQuotaMetadataManager`, `ThrottleCallback` and `ThrottledChannel`.
    Additionally, a new interface `QuotaManagersProvider` has been
    introduced to avoid the need to move class `QuotaManagers` in
    `QuotaFactory`, as doing so would have necessitated significantly more
    extensive changes.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 checkstyle/import-control-metadata.xml             |  1 +
 core/src/main/java/kafka/server/QuotaFactory.java  | 10 +++
 .../main/scala/kafka/network/RequestChannel.scala  |  3 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  7 +-
 .../main/scala/kafka/server/ControllerServer.scala |  9 +--
 .../server/metadata/BrokerMetadataPublisher.scala  |  4 +-
 .../DynamicTopicClusterQuotaPublisher.scala        | 72 --------------------
 .../unit/kafka/server/ClientQuotaManagerTest.scala |  2 +-
 .../unit/kafka/server/ControllerApisTest.scala     |  3 +-
 .../metadata/BrokerMetadataPublisherTest.scala     |  2 +-
 .../DynamicTopicClusterQuotaPublisher.java         | 76 ++++++++++++++++++++++
 .../publisher/QuotaConfigChangeListener.java       | 26 ++++++++
 12 files changed, 128 insertions(+), 87 deletions(-)

diff --git a/checkstyle/import-control-metadata.xml 
b/checkstyle/import-control-metadata.xml
index ce036cf7cbf..5c8e377f628 100644
--- a/checkstyle/import-control-metadata.xml
+++ b/checkstyle/import-control-metadata.xml
@@ -161,6 +161,7 @@
         <allow pkg="org.apache.kafka.server.common" />
         <allow pkg="org.apache.kafka.server.fault" />
         <allow pkg="org.apache.kafka.server.config" />
+        <allow pkg="org.apache.kafka.server.quota" />
         <allow pkg="org.apache.kafka.server.util"/>
         <allow pkg="org.apache.kafka.test" />
         <subpackage name="authorizer">
diff --git a/core/src/main/java/kafka/server/QuotaFactory.java 
b/core/src/main/java/kafka/server/QuotaFactory.java
index b672be42650..896e52da770 100644
--- a/core/src/main/java/kafka/server/QuotaFactory.java
+++ b/core/src/main/java/kafka/server/QuotaFactory.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.publisher.QuotaConfigChangeListener;
 import org.apache.kafka.server.config.ClientQuotaManagerConfig;
 import org.apache.kafka.server.config.QuotaConfig;
 import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
@@ -67,6 +68,15 @@ public class QuotaFactory {
             controllerMutation.shutdown();
             clientQuotaCallbackPlugin.ifPresent(plugin -> 
Utils.closeQuietly(plugin, "client quota callback plugin"));
         }
+
+        public QuotaConfigChangeListener quotaConfigChangeListener() {
+            return () -> {
+                fetch.updateQuotaMetricConfigs();
+                produce.updateQuotaMetricConfigs();
+                request.updateQuotaMetricConfigs();
+                controllerMutation.updateQuotaMetricConfigs();
+            };
+        }
     }
 
     public static QuotaManagers instantiate(
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index a0fbc045206..de0a9d93173 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -31,11 +31,10 @@ import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.network.Session
 import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
 import org.apache.kafka.server.common.RequestLocal
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.network.RequestConvertToJson
+import org.apache.kafka.network.{RequestConvertToJson, Session}
 
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters.RichOption
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index c173a9aad04..5c8b16fe2ad 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorRec
 import org.apache.kafka.coordinator.transaction.ProducerIdManager
 import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, 
MetadataPublisher}
 import org.apache.kafka.metadata.{BrokerState, ListenerInfo, 
KRaftMetadataCache, MetadataCache, MetadataVersionConfigValidator}
-import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, 
DynamicTopicClusterQuotaPublisher, ScramPublisher}
 import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, 
DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
@@ -506,10 +506,11 @@ class BrokerServer(
         ),
         new DynamicTopicClusterQuotaPublisher(
           clusterId,
-          config,
+          config.nodeId,
           sharedServer.metadataPublishingFaultHandler,
           "broker",
-          quotaManagers,
+          quotaManagers.clientQuotaCallbackPlugin(),
+          quotaManagers.quotaConfigChangeListener()
         ),
         new ScramPublisher(
           config.nodeId,
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 966ade4403d..b19cf062f56 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -20,7 +20,7 @@ package kafka.server
 import kafka.network.SocketServer
 import kafka.raft.KafkaRaftManager
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.{ClientQuotaMetadataManager, 
DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, 
KRaftMetadataCachePublisher}
+import kafka.server.metadata.{ClientQuotaMetadataManager, 
DynamicConfigPublisher, KRaftMetadataCachePublisher}
 
 import scala.collection.immutable
 import kafka.utils.{CoreUtils, Logging}
@@ -38,7 +38,7 @@ import 
org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
 import org.apache.kafka.metadata.{KafkaConfigSchema, KRaftMetadataCache, 
ListenerInfo}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
-import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, FeaturesPublisher, 
ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, 
DynamicTopicClusterQuotaPublisher, FeaturesPublisher, ScramPublisher}
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
 import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager}
@@ -349,10 +349,11 @@ class ControllerServer(
       // Set up the DynamicTopicClusterQuotaPublisher. This will enable quotas 
for the cluster and topics.
       metadataPublishers.add(new DynamicTopicClusterQuotaPublisher(
         clusterId,
-        config,
+        config.nodeId,
         sharedServer.metadataPublishingFaultHandler,
         "controller",
-        quotaManagers,
+        quotaManagers.clientQuotaCallbackPlugin(),
+        quotaManagers.quotaConfigChangeListener()
       ))
 
       // Set up the SCRAM publisher.
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 35c44b9524d..1313bda8aa1 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.image.loader.LoaderManifest
 import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
 import org.apache.kafka.metadata.KRaftMetadataCache
-import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, 
DynamicTopicClusterQuotaPublisher, ScramPublisher}
 import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
 import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, 
ShareVersion}
 import org.apache.kafka.server.fault.FaultHandler
@@ -203,7 +203,7 @@ class BrokerMetadataPublisher(
       dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage, manifest)
 
       // Apply topic or cluster quotas delta.
-      dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage)
+      dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage, 
manifest)
 
       // Apply SCRAM delta.
       scramPublisher.onMetadataUpdate(delta, newImage, manifest)
diff --git 
a/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
 
b/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
deleted file mode 100644
index 7798c18b4d6..00000000000
--- 
a/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package kafka.server.metadata
-
-import kafka.server.KafkaConfig
-import kafka.server.QuotaFactory.QuotaManagers
-import kafka.utils.Logging
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-import org.apache.kafka.image.loader.LoaderManifest
-import org.apache.kafka.metadata.MetadataCache
-import org.apache.kafka.server.fault.FaultHandler
-
-/**
- * Publishing dynamic topic or cluster changes to the client quota manager.
- * Temporary solution since Cluster objects are immutable and costly to update 
for every metadata change.
- * See KAFKA-18239 to trace the issue.
- */
-class DynamicTopicClusterQuotaPublisher (
-  clusterId: String,
-  conf: KafkaConfig,
-  faultHandler: FaultHandler,
-  nodeType: String,
-  quotaManagers: QuotaManagers
-) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
-  logIdent = s"[${name()}] "
-
-  override def name(): String = s"DynamicTopicClusterQuotaPublisher $nodeType 
id=${conf.nodeId}"
-
-  override def onMetadataUpdate(
-    delta: MetadataDelta,
-    newImage: MetadataImage,
-    manifest: LoaderManifest
-  ): Unit = {
-    onMetadataUpdate(delta, newImage)
-  }
-
-  def onMetadataUpdate(
-    delta: MetadataDelta,
-    newImage: MetadataImage,
-  ): Unit = {
-    try {
-      quotaManagers.clientQuotaCallbackPlugin().ifPresent(plugin => {
-        if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
-          val cluster = MetadataCache.toCluster(clusterId, newImage)
-          if (plugin.get().updateClusterMetadata(cluster)) {
-            quotaManagers.fetch.updateQuotaMetricConfigs()
-            quotaManagers.produce.updateQuotaMetricConfigs()
-            quotaManagers.request.updateQuotaMetricConfigs()
-            quotaManagers.controllerMutation.updateQuotaMetricConfigs()
-          }
-        }
-      })
-    } catch {
-      case t: Throwable =>
-        val deltaName = s"MetadataDelta up to 
${newImage.highestOffsetAndEpoch().offset}"
-        faultHandler.handleFault("Uncaught exception while " +
-          s"publishing dynamic topic or cluster changes from $deltaName", t)
-    }
-  }
-}
- 
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index c166eef8012..142c00441eb 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -21,8 +21,8 @@ import java.net.InetAddress
 import org.apache.kafka.common.internals.Plugin
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.server.config.ClientQuotaManagerConfig
 import org.apache.kafka.network.Session
+import org.apache.kafka.server.config.ClientQuotaManagerConfig
 import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, 
ClientQuotaManager, ClientQuotaType, QuotaType}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 0acc83e64a9..23bb4cc413b 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -52,9 +52,8 @@ import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX
 import org.apache.kafka.controller.{Controller, ControllerRequestContext, 
ResultOrError}
 import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
 import org.apache.kafka.metadata.KRaftMetadataCache
-import org.apache.kafka.network.SocketServerConfigs
+import org.apache.kafka.network.{Session, SocketServerConfigs}
 import org.apache.kafka.network.metrics.RequestChannelMetrics
-import org.apache.kafka.network.Session
 import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig, RaftManager}
 import org.apache.kafka.server.SimpleApiVersionManager
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult, Authorizer}
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index dc3b987488f..f271d238d06 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, 
ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, 
MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, 
ScramImage, TopicsImage}
 import org.apache.kafka.image.loader.LogDeltaManifest
 import org.apache.kafka.metadata.KRaftMetadataCache
-import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, 
DynamicTopicClusterQuotaPublisher, ScramPublisher}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.LeaderAndEpoch
 import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, 
ShareVersion}
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java
new file mode 100644
index 00000000000..1ffc8f60783
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.publisher;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+
+import java.util.Optional;
+
+public class DynamicTopicClusterQuotaPublisher implements MetadataPublisher {
+    private final String clusterId;
+    private final int nodeId;
+    private final FaultHandler faultHandler;
+    private final String nodeType;
+    private final Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin;
+    private final QuotaConfigChangeListener quotaConfigChangeListener;
+
+    public DynamicTopicClusterQuotaPublisher(
+        String clusterId,
+        int nodeId,
+        FaultHandler faultHandler,
+        String nodeType,
+        Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin,
+        QuotaConfigChangeListener quotaConfigChangeListener
+    ) {
+        this.clusterId = clusterId;
+        this.nodeId = nodeId;
+        this.faultHandler = faultHandler;
+        this.nodeType = nodeType;
+        this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin;
+        this.quotaConfigChangeListener = quotaConfigChangeListener;
+    }
+
+    @Override
+    public String name() {
+        return "DynamicTopicClusterQuotaPublisher " + nodeType + " id=" + 
nodeId;
+    }
+
+    @Override
+    public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, 
LoaderManifest manifest) {
+        try {
+            clientQuotaCallbackPlugin.ifPresent(plugin -> {
+                if (delta.topicsDelta() != null || delta.clusterDelta() != 
null) {
+                    Cluster cluster = MetadataCache.toCluster(clusterId, 
newImage);
+                    if (plugin.get().updateClusterMetadata(cluster)) {
+                        quotaConfigChangeListener.onChange();
+                    }
+                }
+            });
+        } catch (Throwable e) {
+            String deltaName = "MetadataDelta up to " + 
newImage.highestOffsetAndEpoch().offset();
+            faultHandler.handleFault("Uncaught exception while publishing 
dynamic topic or cluster changes from " + deltaName, e);
+        }
+    }
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/QuotaConfigChangeListener.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/QuotaConfigChangeListener.java
new file mode 100644
index 00000000000..a2961eb2b0f
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/QuotaConfigChangeListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.publisher;
+
+/**
+ * A callback interface for reacting to quota configuration changes.
+ * This is used by DynamicTopicClusterQuotaPublisher to notify when quota 
metadata has changed.
+ */
+@FunctionalInterface
+public interface QuotaConfigChangeListener {
+    void onChange();
+}

Reply via email to