This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 06185618b16 KAFKA-19817 Move DynamicTopicClusterQuotaPublisher to
metadata module (#20783)
06185618b16 is described below
commit 06185618b169274914c27d7bcb46a5fa8c1c54f6
Author: jimmy <[email protected]>
AuthorDate: Sun Jan 18 01:11:35 2026 +0800
KAFKA-19817 Move DynamicTopicClusterQuotaPublisher to metadata module
(#20783)
see: [KAFKA-19817](https://issues.apache.org/jira/browse/KAFKA-19817)
This PR intended to move `DynamicTopicClusterQuotaPublisher` to the
metadata module . To prevent circular dependencies, several associated
classes have been migrated as well,including `ClientSensors` ,
`ClientQuotaMetadataManager`, `ThrottleCallback` and `ThrottledChannel`.
Additionally, a new interface `QuotaManagersProvider` has been
introduced to avoid the need to move class `QuotaManagers` in
`QuotaFactory`, as doing so would have necessitated significantly more
extensive changes.
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/import-control-metadata.xml | 1 +
core/src/main/java/kafka/server/QuotaFactory.java | 10 +++
.../main/scala/kafka/network/RequestChannel.scala | 3 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 7 +-
.../main/scala/kafka/server/ControllerServer.scala | 9 +--
.../server/metadata/BrokerMetadataPublisher.scala | 4 +-
.../DynamicTopicClusterQuotaPublisher.scala | 72 --------------------
.../unit/kafka/server/ClientQuotaManagerTest.scala | 2 +-
.../unit/kafka/server/ControllerApisTest.scala | 3 +-
.../metadata/BrokerMetadataPublisherTest.scala | 2 +-
.../DynamicTopicClusterQuotaPublisher.java | 76 ++++++++++++++++++++++
.../publisher/QuotaConfigChangeListener.java | 26 ++++++++
12 files changed, 128 insertions(+), 87 deletions(-)
diff --git a/checkstyle/import-control-metadata.xml
b/checkstyle/import-control-metadata.xml
index ce036cf7cbf..5c8e377f628 100644
--- a/checkstyle/import-control-metadata.xml
+++ b/checkstyle/import-control-metadata.xml
@@ -161,6 +161,7 @@
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.config" />
+ <allow pkg="org.apache.kafka.server.quota" />
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.test" />
<subpackage name="authorizer">
diff --git a/core/src/main/java/kafka/server/QuotaFactory.java
b/core/src/main/java/kafka/server/QuotaFactory.java
index b672be42650..896e52da770 100644
--- a/core/src/main/java/kafka/server/QuotaFactory.java
+++ b/core/src/main/java/kafka/server/QuotaFactory.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.publisher.QuotaConfigChangeListener;
import org.apache.kafka.server.config.ClientQuotaManagerConfig;
import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
@@ -67,6 +68,15 @@ public class QuotaFactory {
controllerMutation.shutdown();
clientQuotaCallbackPlugin.ifPresent(plugin ->
Utils.closeQuietly(plugin, "client quota callback plugin"));
}
+
+ public QuotaConfigChangeListener quotaConfigChangeListener() {
+ return () -> {
+ fetch.updateQuotaMetricConfigs();
+ produce.updateQuotaMetricConfigs();
+ request.updateQuotaMetricConfigs();
+ controllerMutation.updateQuotaMetricConfigs();
+ };
+ }
}
public static QuotaManagers instantiate(
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala
b/core/src/main/scala/kafka/network/RequestChannel.scala
index a0fbc045206..de0a9d93173 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -31,11 +31,10 @@ import org.apache.kafka.common.network.Send
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
-import org.apache.kafka.network.Session
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.network.RequestConvertToJson
+import org.apache.kafka.network.{RequestConvertToJson, Session}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index c173a9aad04..5c8b16fe2ad 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorRec
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker,
MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo,
KRaftMetadataCache, MetadataCache, MetadataVersionConfigValidator}
-import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher,
DynamicTopicClusterQuotaPublisher, ScramPublisher}
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
@@ -506,10 +506,11 @@ class BrokerServer(
),
new DynamicTopicClusterQuotaPublisher(
clusterId,
- config,
+ config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"broker",
- quotaManagers,
+ quotaManagers.clientQuotaCallbackPlugin(),
+ quotaManagers.quotaConfigChangeListener()
),
new ScramPublisher(
config.nodeId,
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 966ade4403d..b19cf062f56 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -20,7 +20,7 @@ package kafka.server
import kafka.network.SocketServer
import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.{ClientQuotaMetadataManager,
DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher,
KRaftMetadataCachePublisher}
+import kafka.server.metadata.{ClientQuotaMetadataManager,
DynamicConfigPublisher, KRaftMetadataCachePublisher}
import scala.collection.immutable
import kafka.utils.{CoreUtils, Logging}
@@ -38,7 +38,7 @@ import
org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
import org.apache.kafka.metadata.{KafkaConfigSchema, KRaftMetadataCache,
ListenerInfo}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
-import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher, FeaturesPublisher,
ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher,
DynamicTopicClusterQuotaPublisher, FeaturesPublisher, ScramPublisher}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager}
@@ -349,10 +349,11 @@ class ControllerServer(
// Set up the DynamicTopicClusterQuotaPublisher. This will enable quotas
for the cluster and topics.
metadataPublishers.add(new DynamicTopicClusterQuotaPublisher(
clusterId,
- config,
+ config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"controller",
- quotaManagers,
+ quotaManagers.clientQuotaCallbackPlugin(),
+ quotaManagers.quotaConfigChangeListener()
))
// Set up the SCRAM publisher.
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 35c44b9524d..1313bda8aa1 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
import org.apache.kafka.metadata.KRaftMetadataCache
-import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher,
DynamicTopicClusterQuotaPublisher, ScramPublisher}
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal,
ShareVersion}
import org.apache.kafka.server.fault.FaultHandler
@@ -203,7 +203,7 @@ class BrokerMetadataPublisher(
dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage, manifest)
// Apply topic or cluster quotas delta.
- dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage)
+ dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage,
manifest)
// Apply SCRAM delta.
scramPublisher.onMetadataUpdate(delta, newImage, manifest)
diff --git
a/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
b/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
deleted file mode 100644
index 7798c18b4d6..00000000000
---
a/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package kafka.server.metadata
-
-import kafka.server.KafkaConfig
-import kafka.server.QuotaFactory.QuotaManagers
-import kafka.utils.Logging
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-import org.apache.kafka.image.loader.LoaderManifest
-import org.apache.kafka.metadata.MetadataCache
-import org.apache.kafka.server.fault.FaultHandler
-
-/**
- * Publishing dynamic topic or cluster changes to the client quota manager.
- * Temporary solution since Cluster objects are immutable and costly to update
for every metadata change.
- * See KAFKA-18239 to trace the issue.
- */
-class DynamicTopicClusterQuotaPublisher (
- clusterId: String,
- conf: KafkaConfig,
- faultHandler: FaultHandler,
- nodeType: String,
- quotaManagers: QuotaManagers
-) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
- logIdent = s"[${name()}] "
-
- override def name(): String = s"DynamicTopicClusterQuotaPublisher $nodeType
id=${conf.nodeId}"
-
- override def onMetadataUpdate(
- delta: MetadataDelta,
- newImage: MetadataImage,
- manifest: LoaderManifest
- ): Unit = {
- onMetadataUpdate(delta, newImage)
- }
-
- def onMetadataUpdate(
- delta: MetadataDelta,
- newImage: MetadataImage,
- ): Unit = {
- try {
- quotaManagers.clientQuotaCallbackPlugin().ifPresent(plugin => {
- if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
- val cluster = MetadataCache.toCluster(clusterId, newImage)
- if (plugin.get().updateClusterMetadata(cluster)) {
- quotaManagers.fetch.updateQuotaMetricConfigs()
- quotaManagers.produce.updateQuotaMetricConfigs()
- quotaManagers.request.updateQuotaMetricConfigs()
- quotaManagers.controllerMutation.updateQuotaMetricConfigs()
- }
- }
- })
- } catch {
- case t: Throwable =>
- val deltaName = s"MetadataDelta up to
${newImage.highestOffsetAndEpoch().offset}"
- faultHandler.handleFault("Uncaught exception while " +
- s"publishing dynamic topic or cluster changes from $deltaName", t)
- }
- }
-}
-
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index c166eef8012..142c00441eb 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -21,8 +21,8 @@ import java.net.InetAddress
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.server.config.ClientQuotaManagerConfig
import org.apache.kafka.network.Session
+import org.apache.kafka.server.config.ClientQuotaManagerConfig
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity,
ClientQuotaManager, ClientQuotaType, QuotaType}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 0acc83e64a9..23bb4cc413b 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -52,9 +52,8 @@ import
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX
import org.apache.kafka.controller.{Controller, ControllerRequestContext,
ResultOrError}
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.metadata.KRaftMetadataCache
-import org.apache.kafka.network.SocketServerConfigs
+import org.apache.kafka.network.{Session, SocketServerConfigs}
import org.apache.kafka.network.metrics.RequestChannelMetrics
-import org.apache.kafka.network.Session
import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig, RaftManager}
import org.apache.kafka.server.SimpleApiVersionManager
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext,
AuthorizationResult, Authorizer}
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index dc3b987488f..f271d238d06 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest,
ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta,
MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage,
ScramImage, TopicsImage}
import org.apache.kafka.image.loader.LogDeltaManifest
import org.apache.kafka.metadata.KRaftMetadataCache
-import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher,
DynamicTopicClusterQuotaPublisher, ScramPublisher}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.LeaderAndEpoch
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion,
ShareVersion}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java
new file mode 100644
index 00000000000..1ffc8f60783
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.publisher;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+
+import java.util.Optional;
+
+public class DynamicTopicClusterQuotaPublisher implements MetadataPublisher {
+ private final String clusterId;
+ private final int nodeId;
+ private final FaultHandler faultHandler;
+ private final String nodeType;
+ private final Optional<Plugin<ClientQuotaCallback>>
clientQuotaCallbackPlugin;
+ private final QuotaConfigChangeListener quotaConfigChangeListener;
+
+ public DynamicTopicClusterQuotaPublisher(
+ String clusterId,
+ int nodeId,
+ FaultHandler faultHandler,
+ String nodeType,
+ Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin,
+ QuotaConfigChangeListener quotaConfigChangeListener
+ ) {
+ this.clusterId = clusterId;
+ this.nodeId = nodeId;
+ this.faultHandler = faultHandler;
+ this.nodeType = nodeType;
+ this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin;
+ this.quotaConfigChangeListener = quotaConfigChangeListener;
+ }
+
+ @Override
+ public String name() {
+ return "DynamicTopicClusterQuotaPublisher " + nodeType + " id=" +
nodeId;
+ }
+
+ @Override
+ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage,
LoaderManifest manifest) {
+ try {
+ clientQuotaCallbackPlugin.ifPresent(plugin -> {
+ if (delta.topicsDelta() != null || delta.clusterDelta() !=
null) {
+ Cluster cluster = MetadataCache.toCluster(clusterId,
newImage);
+ if (plugin.get().updateClusterMetadata(cluster)) {
+ quotaConfigChangeListener.onChange();
+ }
+ }
+ });
+ } catch (Throwable e) {
+ String deltaName = "MetadataDelta up to " +
newImage.highestOffsetAndEpoch().offset();
+ faultHandler.handleFault("Uncaught exception while publishing
dynamic topic or cluster changes from " + deltaName, e);
+ }
+ }
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/QuotaConfigChangeListener.java
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/QuotaConfigChangeListener.java
new file mode 100644
index 00000000000..a2961eb2b0f
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/QuotaConfigChangeListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.publisher;
+
+/**
+ * A callback interface for reacting to quota configuration changes.
+ * This is used by DynamicTopicClusterQuotaPublisher to notify when quota
metadata has changed.
+ */
+@FunctionalInterface
+public interface QuotaConfigChangeListener {
+ void onChange();
+}