This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 486b991f221 KAFKA-18711 Move DelegationTokenPublisher to metadata module (#20475) 486b991f221 is described below commit 486b991f221dd75637402e2ff0a10d0a482e6547 Author: Maros Orsak <maros.orsak...@gmail.com> AuthorDate: Wed Sep 24 14:19:08 2025 +0200 KAFKA-18711 Move DelegationTokenPublisher to metadata module (#20475) Basically, one of the refactor tasks. In this PR, I have moved `DelegationTokenPublisher` to the metadata module. Similar to the `ScramPublisher` migration (commit feee50f476), I have moved `DelegationTokenManager` to the server-common module, as it would otherwise create a circular dependency. Moreover, I have made multiple changes throughout the codebase to reference `DelegationTokenManager` from server-common instead of the server module. Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- checkstyle/import-control-server-common.xml | 4 ++ .../kafka/server/builders/KafkaApisBuilder.java | 2 +- .../src/main/scala/kafka/server/BrokerServer.scala | 8 +-- .../main/scala/kafka/server/ControllerApis.scala | 3 +- .../main/scala/kafka/server/ControllerServer.scala | 13 ++-- core/src/main/scala/kafka/server/KafkaApis.scala | 3 +- .../server/metadata/BrokerMetadataPublisher.scala | 4 +- .../server/metadata/DelegationTokenPublisher.scala | 83 ---------------------- .../metadata/BrokerMetadataPublisherTest.scala | 2 +- .../publisher/DelegationTokenPublisher.java | 73 +++++++++++++++++++ .../kafka/security}/DelegationTokenManager.java | 2 +- .../config/DelegationTokenManagerConfigs.java | 0 12 files changed, 97 insertions(+), 100 deletions(-) diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml index 21b13ed91d2..95a014b87e4 100644 --- a/checkstyle/import-control-server-common.xml +++ b/checkstyle/import-control-server-common.xml @@ -33,6 +33,7 @@ <allow pkg="javax.net.ssl" /> <allow pkg="javax.security" /> <allow pkg="net.jqwik.api" /> + <allow pkg="javax.crypto" /> <!-- no one depends on the server --> <disallow pkg="kafka" /> @@ -49,6 +50,9 @@ <!-- persistent collection factories/non-library-specific wrappers --> <allow pkg="org.apache.kafka.server.immutable" exact-match="true" /> + <!-- allow config classes for server package --> + <allow pkg="org.apache.kafka.server.config" /> + <subpackage name="queue"> <allow pkg="org.apache.kafka.test" /> </subpackage> diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index ecbb6c8b154..e03ab35e90e 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -36,9 +36,9 @@ import org.apache.kafka.coordinator.group.GroupCoordinator; import org.apache.kafka.coordinator.share.ShareCoordinator; import org.apache.kafka.metadata.ConfigRepository; import org.apache.kafka.metadata.MetadataCache; +import org.apache.kafka.security.DelegationTokenManager; import org.apache.kafka.server.ApiVersionManager; import org.apache.kafka.server.ClientMetricsManager; -import org.apache.kafka.server.DelegationTokenManager; import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 47085169979..689c62b8687 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -42,8 +42,8 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec import org.apache.kafka.coordinator.transaction.ProducerIdManager import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher} import org.apache.kafka.metadata.{BrokerState, ListenerInfo} -import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher} -import org.apache.kafka.security.CredentialProvider +import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher} +import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager} import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition} import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs} @@ -54,7 +54,7 @@ import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState import org.apache.kafka.server.share.session.ShareSessionCache import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper} import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler} -import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, DelegationTokenManager, ProcessRole} +import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, ProcessRole} import org.apache.kafka.server.transaction.AddPartitionsToTxnManager import org.apache.kafka.storage.internals.log.LogDirFailureChannel import org.apache.kafka.storage.log.metrics.BrokerTopicStats @@ -502,7 +502,7 @@ class BrokerServer( "broker", credentialProvider), new DelegationTokenPublisher( - config, + config.nodeId, sharedServer.metadataPublishingFaultHandler, "broker", tokenManager), diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index ac9a2d9eff1..f10b769d9c1 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -55,7 +55,8 @@ import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.raft.RaftManager -import org.apache.kafka.server.{ApiVersionManager, DelegationTokenManager, ProcessRole} +import org.apache.kafka.security.DelegationTokenManager +import org.apache.kafka.server.{ApiVersionManager, ProcessRole} import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal} import org.apache.kafka.server.quota.ControllerMutationQuota diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index badcb9b2d8a..e41705ed3ba 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager import kafka.server.QuotaFactory.QuotaManagers import scala.collection.immutable -import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher} +import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher} import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.internals.Plugin import org.apache.kafka.common.message.ApiMessageType.ListenerType @@ -38,14 +38,15 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo} import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer import org.apache.kafka.metadata.bootstrap.BootstrapMetadata -import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher, ScramPublisher} +import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, FeaturesPublisher, ScramPublisher} import org.apache.kafka.raft.QuorumConfig -import org.apache.kafka.security.CredentialProvider -import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager} +import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager} +import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager} import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG} import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, NodeToControllerChannelManager} -import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs} +import org.apache.kafka.server.config.ConfigType +import org.apache.kafka.server.config.DelegationTokenManagerConfigs import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy} @@ -360,7 +361,7 @@ class ControllerServer( // We need a tokenManager for the Publisher // The tokenCache in the tokenManager is the same used in DelegationTokenControlManager metadataPublishers.add(new DelegationTokenPublisher( - config, + config.nodeId, sharedServer.metadataPublishingFaultHandler, "controller", new DelegationTokenManager(delegationTokenManagerConfigs, tokenCache) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d3935c8e507..4cbef3fa648 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -60,7 +60,8 @@ import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.coordinator.group.{Group, GroupConfig, GroupConfigManager, GroupCoordinator} import org.apache.kafka.coordinator.share.ShareCoordinator import org.apache.kafka.metadata.{ConfigRepository, MetadataCache} -import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, DelegationTokenManager, ProcessRole} +import org.apache.kafka.security.DelegationTokenManager +import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, ProcessRole} import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.{GroupVersion, RequestLocal, ShareVersion, StreamsVersion, TransactionVersion} import org.apache.kafka.server.config.DelegationTokenManagerConfigs diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 30ea835b5be..8df8a275580 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.publisher.MetadataPublisher import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} -import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher} +import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher} import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion} import org.apache.kafka.server.fault.FaultHandler @@ -227,7 +227,7 @@ class BrokerMetadataPublisher( scramPublisher.onMetadataUpdate(delta, newImage, manifest) // Apply DelegationToken delta. - delegationTokenPublisher.onMetadataUpdate(delta, newImage) + delegationTokenPublisher.onMetadataUpdate(delta, newImage, manifest) // Apply ACL delta. aclPublisher.onMetadataUpdate(delta, newImage, manifest) diff --git a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala b/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala deleted file mode 100644 index 0e12c34b3c5..00000000000 --- a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server.metadata - -import kafka.server.KafkaConfig -import kafka.utils.Logging -import org.apache.kafka.image.loader.LoaderManifest -import org.apache.kafka.image.{MetadataDelta, MetadataImage} -import org.apache.kafka.server.DelegationTokenManager -import org.apache.kafka.server.fault.FaultHandler - - -class DelegationTokenPublisher( - conf: KafkaConfig, - faultHandler: FaultHandler, - nodeType: String, - tokenManager: DelegationTokenManager, -) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { - logIdent = s"[${name()}] " - - var _firstPublish = true - - override def name(): String = s"DelegationTokenPublisher $nodeType id=${conf.nodeId}" - - override def onMetadataUpdate( - delta: MetadataDelta, - newImage: MetadataImage, - manifest: LoaderManifest - ): Unit = { - onMetadataUpdate(delta, newImage) - } - - def onMetadataUpdate( - delta: MetadataDelta, - newImage: MetadataImage, - ): Unit = { - val deltaName = if (_firstPublish) { - s"initial MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" - } else { - s"update MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" - } - try { - if (_firstPublish) { - // Initialize the tokenCache with the Image - Option(newImage.delegationTokens()).foreach { delegationTokenImage => - delegationTokenImage.tokens().forEach { (_, delegationTokenData) => - tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.tokenInformation())) - } - } - _firstPublish = false - } - // Apply changes to DelegationTokens. - Option(delta.delegationTokenDelta()).foreach { delegationTokenDelta => - delegationTokenDelta.changes().forEach { - case (tokenId, delegationTokenData) => - if (delegationTokenData.isPresent) { - tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.get().tokenInformation())) - } else { - tokenManager.removeToken(tokenId) - } - } - } - } catch { - case t: Throwable => faultHandler.handleFault("Uncaught exception while " + - s"publishing DelegationToken changes from $deltaName", t) - } - } -} diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 828ca0d7ad4..32727a4c3cc 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.coordinator.share.ShareCoordinator import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage} import org.apache.kafka.image.loader.LogDeltaManifest -import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher} +import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.LeaderAndEpoch import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/DelegationTokenPublisher.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DelegationTokenPublisher.java new file mode 100644 index 00000000000..347b0d7f531 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DelegationTokenPublisher.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata.publisher; + +import org.apache.kafka.image.DelegationTokenImage; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.image.publisher.MetadataPublisher; +import org.apache.kafka.security.DelegationTokenManager; +import org.apache.kafka.server.fault.FaultHandler; + +public class DelegationTokenPublisher implements MetadataPublisher { + private final int nodeId; + private final FaultHandler faultHandler; + private final String nodeType; + private final DelegationTokenManager tokenManager; + private boolean firstPublish = true; + + public DelegationTokenPublisher(int nodeId, FaultHandler faultHandler, String nodeType, DelegationTokenManager tokenManager) { + this.nodeId = nodeId; + this.faultHandler = faultHandler; + this.nodeType = nodeType; + this.tokenManager = tokenManager; + } + + @Override + public final String name() { + return "DelegationTokenPublisher " + nodeType + " id=" + nodeId; + } + + @Override + public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) { + var first = firstPublish; + try { + if (firstPublish) { + // Initialize the tokenCache with the Image + DelegationTokenImage delegationTokenImage = newImage.delegationTokens(); + for (var token : delegationTokenImage.tokens().entrySet()) { + tokenManager.updateToken(tokenManager.getDelegationToken(token.getValue().tokenInformation())); + } + firstPublish = false; + } + // Apply changes to DelegationTokens. + for (var token : delta.getOrCreateDelegationTokenDelta().changes().entrySet()) { + var tokenId = token.getKey(); + var delegationTokenData = token.getValue(); + if (delegationTokenData.isPresent()) + tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.get().tokenInformation())); + else + tokenManager.removeToken(tokenId); + } + } catch (Throwable t) { + var msg = String.format("Uncaught exception while publishing DelegationToken changes from %s MetadataDelta up to %s", + first ? "initial" : "update", newImage.highestOffsetAndEpoch().offset()); + faultHandler.handleFault(msg, t); + } + } +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java b/server-common/src/main/java/org/apache/kafka/security/DelegationTokenManager.java similarity index 99% rename from server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java rename to server-common/src/main/java/org/apache/kafka/security/DelegationTokenManager.java index 54832fbd502..ef82a0702c8 100644 --- a/server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java +++ b/server-common/src/main/java/org/apache/kafka/security/DelegationTokenManager.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.server; +package org.apache.kafka.security; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.scram.ScramCredential; diff --git a/server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java similarity index 100% rename from server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java rename to server-common/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java