This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a5e5e2dcd5d KAFKA-18706 Move AclPublisher to metadata module (#18802)
a5e5e2dcd5d is described below
commit a5e5e2dcd5dc7986b9e11294bb3bd6c7ecbc94ea
Author: PoAn Yang <[email protected]>
AuthorDate: Sun Mar 9 21:00:33 2025 +0800
KAFKA-18706 Move AclPublisher to metadata module (#18802)
Move AclPublisher to org.apache.kafka.metadata.publisher package.
Reviewers: Christo Lolov <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 3 +-
.../main/scala/kafka/server/ControllerServer.scala | 6 +-
.../scala/kafka/server/metadata/AclPublisher.scala | 102 -------------------
.../server/metadata/BrokerMetadataPublisher.scala | 1 +
.../metadata/BrokerMetadataPublisherTest.scala | 1 +
.../kafka/metadata/publisher/AclPublisher.java | 108 +++++++++++++++++++++
6 files changed, 115 insertions(+), 106 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 7ff50d175b5..5a65e028877 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -42,6 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorRec
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker,
MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
+import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
@@ -528,7 +529,7 @@ class BrokerServer(
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"broker",
- authorizer
+ authorizer.toJava
),
sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 9e3439d61c3..5ea02cbe623 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers
import scala.collection.immutable
-import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager,
DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher,
DynamicTopicClusterQuotaPublisher, KRaftMetadataCache,
KRaftMetadataCachePublisher, ScramPublisher}
+import kafka.server.metadata.{ClientQuotaMetadataManager,
DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher,
DynamicTopicClusterQuotaPublisher, KRaftMetadataCache,
KRaftMetadataCachePublisher, ScramPublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
@@ -37,7 +37,7 @@ import
org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
-import org.apache.kafka.metadata.publisher.FeaturesPublisher
+import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer
@@ -375,7 +375,7 @@ class ControllerServer(
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"controller",
- authorizer
+ authorizer.toJava
))
// Install all metadata publishers.
diff --git a/core/src/main/scala/kafka/server/metadata/AclPublisher.scala
b/core/src/main/scala/kafka/server/metadata/AclPublisher.scala
deleted file mode 100644
index 43fb2058df3..00000000000
--- a/core/src/main/scala/kafka/server/metadata/AclPublisher.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import kafka.utils.Logging
-import org.apache.kafka.image.loader.{LoaderManifest, LoaderManifestType}
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
-import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.fault.FaultHandler
-
-import scala.concurrent.TimeoutException
-
-
-class AclPublisher(
- nodeId: Int,
- faultHandler: FaultHandler,
- nodeType: String,
- authorizer: Option[Authorizer],
-) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
- logIdent = s"[${name()}] "
-
- override def name(): String = s"AclPublisher $nodeType id=$nodeId"
-
- private var completedInitialLoad = false
-
- override def onMetadataUpdate(
- delta: MetadataDelta,
- newImage: MetadataImage,
- manifest: LoaderManifest
- ): Unit = {
- val deltaName = s"MetadataDelta up to ${newImage.offset()}"
-
- // Apply changes to ACLs. This needs to be handled carefully because while
we are
- // applying these changes, the Authorizer is continuing to return
authorization
- // results in other threads. We never want to expose an invalid state. For
example,
- // if the user created a DENY ALL acl and then created an ALLOW ACL for
topic foo,
- // we want to apply those changes in that order, not the reverse order!
Otherwise
- // there could be a window during which incorrect authorization results
are returned.
- Option(delta.aclsDelta()).foreach { aclsDelta =>
- authorizer match {
- case Some(authorizer: ClusterMetadataAuthorizer) => if
(manifest.`type`().equals(LoaderManifestType.SNAPSHOT)) {
- try {
- // If the delta resulted from a snapshot load, we want to apply
the new changes
- // all at once using ClusterMetadataAuthorizer#loadSnapshot. If
this is the
- // first snapshot load, it will also complete the futures returned
by
- // Authorizer#start (which we wait for before processing RPCs).
- info(s"Loading authorizer snapshot at offset ${newImage.offset()}")
- authorizer.loadSnapshot(newImage.acls().acls())
- } catch {
- case t: Throwable => faultHandler.handleFault("Error loading " +
- s"authorizer snapshot in $deltaName", t)
- }
- } else {
- try {
- // Because the changes map is a LinkedHashMap, the deltas will be
returned in
- // the order they were performed.
- aclsDelta.changes().forEach((key, value) =>
- if (value.isPresent) {
- authorizer.addAcl(key, value.get())
- } else {
- authorizer.removeAcl(key)
- })
- } catch {
- case t: Throwable => faultHandler.handleFault("Error loading " +
- s"authorizer changes in $deltaName", t)
- }
- }
- if (!completedInitialLoad) {
- // If we are receiving this onMetadataUpdate call, that means the
MetadataLoader has
- // loaded up to the local high water mark. So we complete the
initial load, enabling
- // the authorizer.
- completedInitialLoad = true
- authorizer.completeInitialLoad()
- }
- case _ => // No ClusterMetadataAuthorizer is configured. There is
nothing to do.
- }
- }
- }
-
- override def close(): Unit = {
- authorizer match {
- case Some(authorizer: ClusterMetadataAuthorizer) =>
authorizer.completeInitialLoad(new TimeoutException)
- case _ =>
- }
- }
-}
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 1985f04348f..8eb0f45def5 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
+import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.fault.FaultHandler
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index a166368a5aa..575d3855dcd 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataImageTest, MetadataProvenance}
import org.apache.kafka.image.loader.LogDeltaManifest
+import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.LeaderAndEpoch
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/AclPublisher.java
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/AclPublisher.java
new file mode 100644
index 00000000000..cbfdb0f68c2
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/AclPublisher.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.publisher;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.loader.LoaderManifestType;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.fault.FaultHandler;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.concurrent.TimeoutException;
+
+public class AclPublisher implements MetadataPublisher {
+ private final Logger log;
+ private final int nodeId;
+ private final FaultHandler faultHandler;
+ private final String nodeType;
+ private final Optional<ClusterMetadataAuthorizer> authorizer;
+ private boolean completedInitialLoad = false;
+
+ public AclPublisher(int nodeId, FaultHandler faultHandler, String
nodeType, Optional<Authorizer> authorizer) {
+ this.nodeId = nodeId;
+ this.faultHandler = faultHandler;
+ this.nodeType = nodeType;
+ this.authorizer =
authorizer.filter(ClusterMetadataAuthorizer.class::isInstance).map(ClusterMetadataAuthorizer.class::cast);
+ this.log = new LogContext(name()).logger(AclPublisher.class);
+ }
+
+ @Override
+ public final String name() {
+ return "AclPublisher " + nodeType + " id=" + nodeId;
+ }
+
+ @Override
+ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage,
LoaderManifest manifest) {
+ String deltaName = "MetadataDelta up to " + newImage.offset();
+
+ // Apply changes to ACLs. This needs to be handled carefully because
while we are
+ // applying these changes, the Authorizer is continuing to return
authorization
+ // results in other threads. We never want to expose an invalid state.
For example,
+ // if the user created a DENY ALL acl and then created an ALLOW ACL
for topic foo,
+ // we want to apply those changes in that order, not the reverse
order! Otherwise
+ // there could be a window during which incorrect authorization
results are returned.
+ Optional.ofNullable(delta.aclsDelta()).ifPresent(aclsDelta -> {
+ authorizer.ifPresent(clusterMetadataAuthorizer -> {
+ if (manifest.type().equals(LoaderManifestType.SNAPSHOT)) {
+ try {
+ // If the delta resulted from a snapshot load, we want
to apply the new changes
+ // all at once using
ClusterMetadataAuthorizer#loadSnapshot. If this is the
+ // first snapshot load, it will also complete the
futures returned by
+ // Authorizer#start (which we wait for before
processing RPCs).
+ log.info("Loading authorizer snapshot at offset {}",
newImage.offset());
+
clusterMetadataAuthorizer.loadSnapshot(newImage.acls().acls());
+ } catch (Throwable t) {
+ faultHandler.handleFault("Error loading authorizer
snapshot in " + deltaName, t);
+ }
+ } else {
+ try {
+ // Because the changes map is a LinkedHashMap, the
deltas will be returned in
+ // the order they were performed.
+ aclsDelta.changes().forEach((key, value) -> {
+ if (value.isPresent()) {
+ clusterMetadataAuthorizer.addAcl(key,
value.get());
+ } else {
+ clusterMetadataAuthorizer.removeAcl(key);
+ }
+ });
+ } catch (Throwable t) {
+ faultHandler.handleFault("Error loading authorizer
changes in " + deltaName, t);
+ }
+ }
+ if (!completedInitialLoad) {
+ // If we are receiving this onMetadataUpdate call, that
means the MetadataLoader has
+ // loaded up to the local high water mark. So we complete
the initial load, enabling
+ // the authorizer.
+ completedInitialLoad = true;
+ clusterMetadataAuthorizer.completeInitialLoad();
+ }
+ });
+ });
+ }
+
+ @Override
+ public void close() {
+ authorizer.ifPresent(clusterMetadataAuthorizer ->
clusterMetadataAuthorizer.completeInitialLoad(new TimeoutException()));
+ }
+}
\ No newline at end of file