This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9318b591d7a KAFKA-15318: Update the Authorizer via AclPublisher (#14169) 9318b591d7a is described below commit 9318b591d7a57b9db1e7519986d78f0402cd5b5e Author: Colin Patrick McCabe <cmcc...@apache.org> AuthorDate: Wed Aug 9 23:54:46 2023 -0700 KAFKA-15318: Update the Authorizer via AclPublisher (#14169) On the controller, move publishing acls to the Authorizer into a dedicated MetadataPublisher, AclPublisher. This publisher listens for notifications from MetadataLoader, and receives only committed data. This brings the controller side in line with how the broker has always worked. It also avoids some ugly code related to publishing directly from the QuorumController. Most important of all, it clears the way to implement metadata transactions without worrying about Authorizer state (since it will be handled by the MetadataLoader, along with other metadata image state). In AclsDelta, we can remove isSnapshotDelta. We always know when the MetadataLoader is giving us a snapshot. Also bring AclsDelta in line with the other delta classes, where completeSnapshot calculates the diff between the previous image and the next one. We don't use this delta (since we just apply the image directly to the authorizer) but we should have it, for consistency. Finally, change MockAclMutator to avoid the need to subclass AclControlManager. Reviewers: David Arthur <mum...@gmail.com> --- .../src/main/scala/kafka/server/BrokerServer.scala | 18 ++-- .../main/scala/kafka/server/ControllerServer.scala | 19 +++- .../scala/kafka/server/metadata/AclPublisher.scala | 102 +++++++++++++++++++++ .../server/metadata/BrokerMetadataPublisher.scala | 43 +-------- .../kafka/security/authorizer/AuthorizerTest.scala | 3 +- .../metadata/BrokerMetadataPublisherTest.scala | 2 +- .../apache/kafka/controller/AclControlManager.java | 49 ++-------- .../apache/kafka/controller/QuorumController.java | 61 +----------- .../java/org/apache/kafka/image/AclsDelta.java | 27 +++--- .../kafka/controller/AclControlManagerTest.java | 6 +- .../kafka/controller/MockAclControlManager.java | 50 ---------- .../apache/kafka/controller/MockAclMutator.java | 92 +++++++++++++++++++ .../kafka/controller/QuorumControllerTest.java | 38 -------- .../org/apache/kafka/metadata/RecordTestUtils.java | 16 +--- .../kafka/metadata/authorizer/MockAclMutator.java | 62 ------------- 15 files changed, 252 insertions(+), 336 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 825d9eb8c19..47f4fee59bf 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -25,7 +25,7 @@ import kafka.log.remote.RemoteLogManager import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.raft.KafkaRaftManager import kafka.security.CredentialProvider -import kafka.server.metadata.{BrokerMetadataPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher} +import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher} import kafka.utils.CoreUtils import org.apache.kafka.clients.NetworkClient import org.apache.kafka.common.config.ConfigException @@ -42,7 +42,6 @@ import org.apache.kafka.coordinator.group import org.apache.kafka.coordinator.group.util.SystemTimerReaper import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde} import org.apache.kafka.image.publisher.MetadataPublisher -import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer import org.apache.kafka.metadata.{BrokerState, VersionRange} import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.authorizer.Authorizer @@ -419,7 +418,12 @@ class BrokerServer( sharedServer.metadataPublishingFaultHandler, "broker", credentialProvider), - authorizer, + new AclPublisher( + config.nodeId, + sharedServer.metadataPublishingFaultHandler, + "broker", + authorizer + ), sharedServer.initialBrokerMetadataLoadFaultHandler, sharedServer.metadataPublishingFaultHandler) metadataPublishers.add(brokerMetadataPublisher) @@ -468,14 +472,6 @@ class BrokerServer( rlm.startup() }) - // If we are using a ClusterMetadataAuthorizer which stores its ACLs in the metadata log, - // notify it that the loading process is complete. - authorizer match { - case Some(clusterMetadataAuthorizer: ClusterMetadataAuthorizer) => - clusterMetadataAuthorizer.completeInitialLoad() - case _ => // nothing to do - } - // We're now ready to unfence the broker. This also allows this broker to transition // from RECOVERY state to RUNNING state, once the controller unfences the broker. FutureUtils.waitWithLogging(logger.underlying, logIdent, diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 3a98fbaf587..78045a6985d 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -26,7 +26,7 @@ import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPoli import kafka.server.QuotaFactory.QuotaManagers import scala.collection.immutable -import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, ScramPublisher} +import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, ScramPublisher} import kafka.utils.{CoreUtils, Logging, PasswordEncoder} import kafka.zk.{KafkaZkClient, ZkMigrationClient} import org.apache.kafka.common.config.ConfigException @@ -238,11 +238,14 @@ class ControllerServer( setNonFatalFaultHandler(sharedServer.nonFatalQuorumControllerFaultHandler). setZkMigrationEnabled(config.migrationEnabled) } + controller = controllerBuilder.build() + + // If we are using a ClusterMetadataAuthorizer, requests to add or remove ACLs must go + // through the controller. authorizer match { - case Some(a: ClusterMetadataAuthorizer) => controllerBuilder.setAuthorizer(a) - case _ => // nothing to do + case Some(a: ClusterMetadataAuthorizer) => a.setAclMutator(controller) + case _ => } - controller = controllerBuilder.build() if (config.migrationEnabled) { val zkClient = KafkaZkClient.createZkClient("KRaft Migration", time, config, KafkaServer.zkClientConfigFromKafkaConfig(config)) @@ -361,6 +364,14 @@ class ControllerServer( sharedServer.metadataPublishingFaultHandler )) + // Set up the ACL publisher. + metadataPublishers.add(new AclPublisher( + config.nodeId, + sharedServer.metadataPublishingFaultHandler, + "controller", + authorizer + )) + // Install all metadata publishers. FutureUtils.waitWithLogging(logger.underlying, logIdent, "the controller metadata publishers to be installed", diff --git a/core/src/main/scala/kafka/server/metadata/AclPublisher.scala b/core/src/main/scala/kafka/server/metadata/AclPublisher.scala new file mode 100644 index 00000000000..819fcc3d38d --- /dev/null +++ b/core/src/main/scala/kafka/server/metadata/AclPublisher.scala @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import kafka.utils.Logging +import org.apache.kafka.image.loader.{LoaderManifest, LoaderManifestType} +import org.apache.kafka.image.{MetadataDelta, MetadataImage} +import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer +import org.apache.kafka.server.authorizer.Authorizer +import org.apache.kafka.server.fault.FaultHandler + +import scala.concurrent.TimeoutException + + +class AclPublisher( + nodeId: Int, + faultHandler: FaultHandler, + nodeType: String, + authorizer: Option[Authorizer], +) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { + logIdent = s"[${name()}] " + + override def name(): String = s"AclPublisher ${nodeType} id=${nodeId}" + + var completedInitialLoad = false + + override def onMetadataUpdate( + delta: MetadataDelta, + newImage: MetadataImage, + manifest: LoaderManifest + ): Unit = { + val deltaName = s"MetadataDelta up to ${newImage.offset()}" + + // Apply changes to ACLs. This needs to be handled carefully because while we are + // applying these changes, the Authorizer is continuing to return authorization + // results in other threads. We never want to expose an invalid state. For example, + // if the user created a DENY ALL acl and then created an ALLOW ACL for topic foo, + // we want to apply those changes in that order, not the reverse order! Otherwise + // there could be a window during which incorrect authorization results are returned. + Option(delta.aclsDelta()).foreach { aclsDelta => + authorizer match { + case Some(authorizer: ClusterMetadataAuthorizer) => if (manifest.`type`().equals(LoaderManifestType.SNAPSHOT)) { + try { + // If the delta resulted from a snapshot load, we want to apply the new changes + // all at once using ClusterMetadataAuthorizer#loadSnapshot. If this is the + // first snapshot load, it will also complete the futures returned by + // Authorizer#start (which we wait for before processing RPCs). + info(s"Loading authorizer snapshot at offset ${newImage.offset()}") + authorizer.loadSnapshot(newImage.acls().acls()) + } catch { + case t: Throwable => faultHandler.handleFault("Error loading " + + s"authorizer snapshot in $deltaName", t) + } + } else { + try { + // Because the changes map is a LinkedHashMap, the deltas will be returned in + // the order they were performed. + aclsDelta.changes().entrySet().forEach(e => + if (e.getValue.isPresent) { + authorizer.addAcl(e.getKey, e.getValue.get()) + } else { + authorizer.removeAcl(e.getKey) + }) + } catch { + case t: Throwable => faultHandler.handleFault("Error loading " + + s"authorizer changes in $deltaName", t) + } + } + if (!completedInitialLoad) { + // If we are receiving this onMetadataUpdate call, that means the MetadataLoader has + // loaded up to the local high water mark. So we complete the initial load, enabling + // the authorizer. + completedInitialLoad = true + authorizer.completeInitialLoad() + } + case _ => // No ClusterMetadataAuthorizer is configured. There is nothing to do. + } + } + } + + override def close(): Unit = { + authorizer match { + case Some(authorizer: ClusterMetadataAuthorizer) => authorizer.completeInitialLoad(new TimeoutException) + case _ => + } + } +} diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index e1bf2e89607..5cfc40a2dd8 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -29,8 +29,6 @@ import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.publisher.MetadataPublisher import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage} -import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer -import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.fault.FaultHandler import java.util.concurrent.CompletableFuture @@ -107,7 +105,7 @@ class BrokerMetadataPublisher( var dynamicConfigPublisher: DynamicConfigPublisher, dynamicClientQuotaPublisher: DynamicClientQuotaPublisher, scramPublisher: ScramPublisher, - private val _authorizer: Option[Authorizer], + aclPublisher: AclPublisher, fatalFaultHandler: FaultHandler, metadataPublishingFaultHandler: FaultHandler ) extends MetadataPublisher with Logging { @@ -229,43 +227,8 @@ class BrokerMetadataPublisher( // Apply SCRAM delta. scramPublisher.onMetadataUpdate(delta, newImage) - // Apply changes to ACLs. This needs to be handled carefully because while we are - // applying these changes, the Authorizer is continuing to return authorization - // results in other threads. We never want to expose an invalid state. For example, - // if the user created a DENY ALL acl and then created an ALLOW ACL for topic foo, - // we want to apply those changes in that order, not the reverse order! Otherwise - // there could be a window during which incorrect authorization results are returned. - Option(delta.aclsDelta()).foreach { aclsDelta => - _authorizer match { - case Some(authorizer: ClusterMetadataAuthorizer) => if (aclsDelta.isSnapshotDelta) { - try { - // If the delta resulted from a snapshot load, we want to apply the new changes - // all at once using ClusterMetadataAuthorizer#loadSnapshot. If this is the - // first snapshot load, it will also complete the futures returned by - // Authorizer#start (which we wait for before processing RPCs). - authorizer.loadSnapshot(newImage.acls().acls()) - } catch { - case t: Throwable => metadataPublishingFaultHandler.handleFault("Error loading " + - s"authorizer snapshot in $deltaName", t) - } - } else { - try { - // Because the changes map is a LinkedHashMap, the deltas will be returned in - // the order they were performed. - aclsDelta.changes().entrySet().forEach(e => - if (e.getValue.isPresent) { - authorizer.addAcl(e.getKey, e.getValue.get()) - } else { - authorizer.removeAcl(e.getKey) - }) - } catch { - case t: Throwable => metadataPublishingFaultHandler.handleFault("Error loading " + - s"authorizer changes in $deltaName", t) - } - } - case _ => // No ClusterMetadataAuthorizer is configured. There is nothing to do. - } - } + // Apply ACL delta. + aclPublisher.onMetadataUpdate(delta, newImage, manifest) try { // Propagate the new image to the group coordinator. diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala index 76bcdeb41af..06244de7dee 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala @@ -35,8 +35,9 @@ import org.apache.kafka.common.resource.ResourceType._ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils} +import org.apache.kafka.controller.MockAclMutator import org.apache.kafka.metadata.authorizer.StandardAuthorizerTest.AuthorizerTestServerInfo -import org.apache.kafka.metadata.authorizer.{MockAclMutator, StandardAuthorizer} +import org.apache.kafka.metadata.authorizer.StandardAuthorizer import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_2_0_IV0, IBP_2_0_IV1} diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 766fba282db..bbffe5c51ea 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -289,7 +289,7 @@ class BrokerMetadataPublisherTest { mock(classOf[DynamicConfigPublisher]), mock(classOf[DynamicClientQuotaPublisher]), mock(classOf[ScramPublisher]), - None, + mock(classOf[AclPublisher]), faultHandler, faultHandler ) diff --git a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java index ff9101e356d..7cd6a948ab8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java @@ -27,10 +27,8 @@ import org.apache.kafka.common.metadata.AccessControlEntryRecord; import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer; import org.apache.kafka.metadata.authorizer.StandardAcl; import org.apache.kafka.metadata.authorizer.StandardAclWithId; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.server.authorizer.AclCreateResult; import org.apache.kafka.server.authorizer.AclDeleteResult; import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult; @@ -48,7 +46,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -58,22 +55,12 @@ import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_ /** * The AclControlManager manages any ACLs that are stored in the __cluster_metadata topic. * If the ACLs are stored externally (such as in ZooKeeper) then there will be nothing for - * this manager to do, and the authorizer field will always be Optional.empty. - * - * Because the Authorizer is being concurrently used by other threads, we need to be - * careful about snapshots. We don't want the Authorizer to act based on partial state - * during the loading process. Therefore, unlike most of the other managers, - * AclControlManager needs to receive callbacks when we start loading a snapshot and when - * we finish. The prepareForSnapshotLoad callback clears the authorizer field, preventing - * any changes from affecting the authorizer until completeSnapshotLoad is called. - * Note that the Authorizer's start() method will block until the first snapshot load has - * completed, which is another reason the prepare / complete callbacks are needed. + * this manager to do. */ public class AclControlManager { static class Builder { private LogContext logContext = null; private SnapshotRegistry snapshotRegistry = null; - private Optional<ClusterMetadataAuthorizer> authorizer = Optional.empty(); Builder setLogContext(LogContext logContext) { this.logContext = logContext; @@ -85,32 +72,24 @@ public class AclControlManager { return this; } - Builder setClusterMetadataAuthorizer(Optional<ClusterMetadataAuthorizer> authorizer) { - this.authorizer = authorizer; - return this; - } - AclControlManager build() { if (logContext == null) logContext = new LogContext(); if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); - return new AclControlManager(logContext, snapshotRegistry, authorizer); + return new AclControlManager(logContext, snapshotRegistry); } } private final Logger log; private final TimelineHashMap<Uuid, StandardAcl> idToAcl; private final TimelineHashSet<StandardAcl> existingAcls; - private final Optional<ClusterMetadataAuthorizer> authorizer; - AclControlManager( + private AclControlManager( LogContext logContext, - SnapshotRegistry snapshotRegistry, - Optional<ClusterMetadataAuthorizer> authorizer + SnapshotRegistry snapshotRegistry ) { this.log = logContext.logger(AclControlManager.class); this.idToAcl = new TimelineHashMap<>(snapshotRegistry, 0); this.existingAcls = new TimelineHashSet<>(snapshotRegistry, 0); - this.authorizer = authorizer; } ControllerResult<List<AclCreateResult>> createAcls(List<AclBinding> acls) { @@ -227,26 +206,15 @@ public class AclControlManager { } } - public void replay( - AccessControlEntryRecord record, - Optional<OffsetAndEpoch> snapshotId - ) { + public void replay(AccessControlEntryRecord record) { StandardAclWithId aclWithId = StandardAclWithId.fromRecord(record); idToAcl.put(aclWithId.id(), aclWithId.acl()); existingAcls.add(aclWithId.acl()); - if (!snapshotId.isPresent()) { - authorizer.ifPresent(a -> { - a.addAcl(aclWithId.id(), aclWithId.acl()); - }); - } log.info("Replayed AccessControlEntryRecord for {}, setting {}", record.id(), aclWithId.acl()); } - public void replay( - RemoveAccessControlEntryRecord record, - Optional<OffsetAndEpoch> snapshotId - ) { + public void replay(RemoveAccessControlEntryRecord record) { StandardAcl acl = idToAcl.remove(record.id()); if (acl == null) { throw new RuntimeException("Unable to replay " + record + ": no acl with " + @@ -256,11 +224,6 @@ public class AclControlManager { throw new RuntimeException("Unable to replay " + record + " for " + acl + ": acl not found " + "in existingAcls."); } - if (!snapshotId.isPresent()) { - authorizer.ifPresent(a -> { - a.removeAcl(record.id()); - }); - } log.info("Replayed RemoveAccessControlEntryRecord for {}, removing {}", record.id(), acl); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 5e55ffa70c4..b658af4c111 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -83,7 +83,6 @@ import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.metadata.FinalizedControllerFeatures; import org.apache.kafka.metadata.KafkaConfigSchema; -import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.metadata.migration.ZkMigrationState; import org.apache.kafka.metadata.migration.ZkRecordConsumer; @@ -195,7 +194,6 @@ public final class QuorumController implements Controller { private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty(); private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty(); private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP; - private Optional<ClusterMetadataAuthorizer> authorizer = Optional.empty(); private Map<String, Object> staticConfig = Collections.emptyMap(); private BootstrapMetadata bootstrapMetadata = null; private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH; @@ -310,11 +308,6 @@ public final class QuorumController implements Controller { return this; } - public Builder setAuthorizer(ClusterMetadataAuthorizer authorizer) { - this.authorizer = Optional.of(authorizer); - return this; - } - public Builder setStaticConfig(Map<String, Object> staticConfig) { this.staticConfig = staticConfig; return this; @@ -373,7 +366,6 @@ public final class QuorumController implements Controller { createTopicPolicy, alterConfigPolicy, configurationValidator, - authorizer, staticConfig, bootstrapMetadata, maxRecordsPerBatch, @@ -965,7 +957,6 @@ public final class QuorumController implements Controller { public void handleCommit(BatchReader<ApiMessageAndVersion> reader) { appendRaftEvent("handleCommit[baseOffset=" + reader.baseOffset() + "]", () -> { try { - maybeCompleteAuthorizerInitialLoad(); boolean isActive = isActiveController(); while (reader.hasNext()) { Batch<ApiMessageAndVersion> batch = reader.next(); @@ -1063,7 +1054,6 @@ public final class QuorumController implements Controller { reader.lastContainedLogEpoch(), reader.lastContainedLogTimestamp()); snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); - authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl())); } finally { reader.close(); } @@ -1112,35 +1102,12 @@ public final class QuorumController implements Controller { if (this != metaLogListener) { log.debug("Ignoring {} raft event from an old registration", name); } else { - try { - runnable.run(); - } finally { - maybeCompleteAuthorizerInitialLoad(); - } + runnable.run(); } }); } } - private void maybeCompleteAuthorizerInitialLoad() { - if (!needToCompleteAuthorizerLoad) return; - OptionalLong highWatermark = raftClient.highWatermark(); - if (highWatermark.isPresent()) { - if (lastCommittedOffset + 1 >= highWatermark.getAsLong()) { - log.info("maybeCompleteAuthorizerInitialLoad: completing authorizer " + - "initial load at last committed offset {}.", lastCommittedOffset); - authorizer.get().completeInitialLoad(); - needToCompleteAuthorizerLoad = false; - } else { - log.trace("maybeCompleteAuthorizerInitialLoad: can't proceed because " + - "lastCommittedOffset = {}, but highWatermark = {}.", - lastCommittedOffset, highWatermark.getAsLong()); - } - } else { - log.trace("maybeCompleteAuthorizerInitialLoad: highWatermark not set."); - } - } - private boolean isActiveController() { return isActiveController(curClaimEpoch); } @@ -1342,7 +1309,6 @@ public final class QuorumController implements Controller { lastCommittedEpoch + " in snapshot registry."); } snapshotRegistry.revertToSnapshot(lastCommittedOffset); - authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl())); updateWriteOffset(-1); clusterControl.deactivate(); cancelMaybeFenceReplicas(); @@ -1572,10 +1538,10 @@ public final class QuorumController implements Controller { clusterControl.replay((BrokerRegistrationChangeRecord) message); break; case ACCESS_CONTROL_ENTRY_RECORD: - aclControlManager.replay((AccessControlEntryRecord) message, snapshotId); + aclControlManager.replay((AccessControlEntryRecord) message); break; case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: - aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId); + aclControlManager.replay((RemoveAccessControlEntryRecord) message); break; case USER_SCRAM_CREDENTIAL_RECORD: scramControlManager.replay((UserScramCredentialRecord) message); @@ -1701,12 +1667,6 @@ public final class QuorumController implements Controller { */ private final ScramControlManager scramControlManager; - /** - * The ClusterMetadataAuthorizer, if one is configured. Note that this will still be - * Optional.empty() if an Authorizer is configured that doesn't use __cluster_metadata. - */ - private final Optional<ClusterMetadataAuthorizer> authorizer; - /** * Manages the standard ACLs in the cluster. * This must be accessed only by the event queue thread. @@ -1754,12 +1714,6 @@ public final class QuorumController implements Controller { */ private long lastCommittedTimestamp = -1; - /** - * True if we need to complete the authorizer initial load. - * This must be accessed only by the event queue thread. - */ - private boolean needToCompleteAuthorizerLoad; - /** * If we have called scheduleWrite, this is the last offset we got back from it. */ @@ -1834,7 +1788,6 @@ public final class QuorumController implements Controller { Optional<CreateTopicPolicy> createTopicPolicy, Optional<AlterConfigPolicy> alterConfigPolicy, ConfigurationValidator configurationValidator, - Optional<ClusterMetadataAuthorizer> authorizer, Map<String, Object> staticConfig, BootstrapMetadata bootstrapMetadata, int maxRecordsPerBatch, @@ -1908,12 +1861,9 @@ public final class QuorumController implements Controller { setLogContext(logContext). setSnapshotRegistry(snapshotRegistry). build(); - this.authorizer = authorizer; - authorizer.ifPresent(a -> a.setAclMutator(this)); this.aclControlManager = new AclControlManager.Builder(). setLogContext(logContext). setSnapshotRegistry(snapshotRegistry). - setClusterMetadataAuthorizer(authorizer). build(); this.logReplayTracker = new LogReplayTracker.Builder(). setLogContext(logContext). @@ -1923,7 +1873,6 @@ public final class QuorumController implements Controller { this.maxRecordsPerBatch = maxRecordsPerBatch; this.metaLogListener = new QuorumMetaLogListener(); this.curClaimEpoch = -1; - this.needToCompleteAuthorizerLoad = authorizer.isPresent(); this.zkRecordConsumer = new MigrationRecordConsumer(); this.zkMigrationEnabled = zkMigrationEnabled; this.recordRedactor = new RecordRedactor(configSchema); @@ -1931,8 +1880,8 @@ public final class QuorumController implements Controller { resetToEmptyState(); - log.info("Creating new QuorumController with clusterId {}, authorizer {}.{}", - clusterId, authorizer, zkMigrationEnabled ? " ZK migration mode is enabled." : ""); + log.info("Creating new QuorumController with clusterId {}.{}", + clusterId, zkMigrationEnabled ? " ZK migration mode is enabled." : ""); this.raftClient.register(metaLogListener); } diff --git a/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java b/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java index cf2bb75ddbf..15e9a69c193 100644 --- a/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java @@ -41,7 +41,6 @@ public final class AclsDelta { private final AclsImage image; private final Map<Uuid, Optional<StandardAcl>> changes = new LinkedHashMap<>(); private final Set<StandardAcl> deleted = new HashSet<>(); - private boolean isSnapshotDelta = false; public AclsDelta(AclsImage image) { this.image = image; @@ -67,17 +66,17 @@ public final class AclsDelta { } void finishSnapshot() { - this.isSnapshotDelta = true; + for (Entry<Uuid, StandardAcl> entry : image.acls().entrySet()) { + if (!changes.containsKey(entry.getKey())) { + changes.put(entry.getKey(), Optional.empty()); + } + } } public void handleMetadataVersionChange(MetadataVersion newVersion) { // no-op } - public boolean isSnapshotDelta() { - return isSnapshotDelta; - } - public void replay(AccessControlEntryRecord record) { StandardAclWithId aclWithId = StandardAclWithId.fromRecord(record); changes.put(aclWithId.id(), Optional.of(aclWithId.acl())); @@ -105,14 +104,12 @@ public final class AclsDelta { public AclsImage apply() { Map<Uuid, StandardAcl> newAcls = new HashMap<>(); - if (!isSnapshotDelta) { - for (Entry<Uuid, StandardAcl> entry : image.acls().entrySet()) { - Optional<StandardAcl> change = changes.get(entry.getKey()); - if (change == null) { - newAcls.put(entry.getKey(), entry.getValue()); - } else if (change.isPresent()) { - newAcls.put(entry.getKey(), change.get()); - } + for (Entry<Uuid, StandardAcl> entry : image.acls().entrySet()) { + Optional<StandardAcl> change = changes.get(entry.getKey()); + if (change == null) { + newAcls.put(entry.getKey(), entry.getValue()); + } else if (change.isPresent()) { + newAcls.put(entry.getKey(), change.get()); } } for (Entry<Uuid, Optional<StandardAcl>> entry : changes.entrySet()) { @@ -127,7 +124,7 @@ public final class AclsDelta { @Override public String toString() { - return "AclsDelta(isSnapshotDelta=" + isSnapshotDelta + + return "AclsDelta(" + ", changes=" + changes.entrySet().stream(). map(e -> "" + e.getKey() + "=" + e.getValue()). collect(Collectors.joining(", ")) + ")"; diff --git a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java index 0806455dcf3..dd6c2d15185 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java @@ -212,7 +212,7 @@ public class AclControlManagerTest { for (StandardAclWithId acl : TEST_ACLS) { AccessControlEntryRecord record = acl.toRecord(); assertTrue(loadedAcls.add(new ApiMessageAndVersion(record, (short) 0))); - manager.replay(acl.toRecord(), Optional.empty()); + manager.replay(acl.toRecord()); } // Verify that the ACLs stored in the AclControlManager match the ones we expect. @@ -241,9 +241,9 @@ public class AclControlManagerTest { AclControlManager manager = new AclControlManager.Builder().build(); MockClusterMetadataAuthorizer authorizer = new MockClusterMetadataAuthorizer(); authorizer.loadSnapshot(manager.idToAcl()); - manager.replay(StandardAclWithIdTest.TEST_ACLS.get(0).toRecord(), Optional.empty()); + manager.replay(StandardAclWithIdTest.TEST_ACLS.get(0).toRecord()); manager.replay(new RemoveAccessControlEntryRecord(). - setId(TEST_ACLS.get(0).id()), Optional.empty()); + setId(TEST_ACLS.get(0).id())); assertTrue(manager.idToAcl().isEmpty()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java b/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java deleted file mode 100644 index e14c8e2e4f5..00000000000 --- a/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.controller; - -import org.apache.kafka.common.acl.AclBinding; -import org.apache.kafka.common.acl.AclBindingFilter; -import org.apache.kafka.common.metadata.AccessControlEntryRecord; -import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer; -import org.apache.kafka.server.authorizer.AclCreateResult; -import org.apache.kafka.server.authorizer.AclDeleteResult; -import org.apache.kafka.timeline.SnapshotRegistry; - -import java.util.List; -import java.util.Optional; - -public class MockAclControlManager extends AclControlManager { - public MockAclControlManager(LogContext logContext, - Optional<ClusterMetadataAuthorizer> authorizer) { - super(new LogContext(), new SnapshotRegistry(logContext), authorizer); - } - - public List<AclCreateResult> createAndReplayAcls(List<AclBinding> acls) { - ControllerResult<List<AclCreateResult>> createResults = createAcls(acls); - createResults.records().forEach(record -> replay((AccessControlEntryRecord) record.message(), Optional.empty())); - return createResults.response(); - } - - public List<AclDeleteResult> deleteAndReplayAcls(List<AclBindingFilter> filters) { - ControllerResult<List<AclDeleteResult>> deleteResults = deleteAcls(filters); - deleteResults.records().forEach(record -> replay((RemoveAccessControlEntryRecord) record.message(), Optional.empty())); - return deleteResults.response(); - } -} diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockAclMutator.java b/metadata/src/test/java/org/apache/kafka/controller/MockAclMutator.java new file mode 100644 index 00000000000..342d164a232 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/MockAclMutator.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.metadata.authorizer.AclMutator; +import org.apache.kafka.metadata.authorizer.StandardAcl; +import org.apache.kafka.metadata.authorizer.StandardAuthorizer; +import org.apache.kafka.server.authorizer.AclCreateResult; +import org.apache.kafka.server.authorizer.AclDeleteResult; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; + + +/** + * The MockAclMutator is a class which connects a StandardAuthorizer up to an AclControlManager. + * Normally, this connection goes through the QuorumController. However, this class just attaches + * the two directly, for the purpose of unit testing. + */ +public class MockAclMutator implements AclMutator { + private final StandardAuthorizer authorizer; + private final AclControlManager aclControl; + + public MockAclMutator( + StandardAuthorizer authorizer + ) { + this.authorizer = authorizer; + this.aclControl = new AclControlManager.Builder().build(); + } + + private void syncIdToAcl( + Map<Uuid, StandardAcl> prevIdToAcl, + Map<Uuid, StandardAcl> nextIdToAcl + ) { + for (Entry<Uuid, StandardAcl> entry : prevIdToAcl.entrySet()) { + if (!entry.getValue().equals(nextIdToAcl.get(entry.getKey()))) { + authorizer.removeAcl(entry.getKey()); + } + } + for (Entry<Uuid, StandardAcl> entry : nextIdToAcl.entrySet()) { + if (!entry.getValue().equals(prevIdToAcl.get(entry.getKey()))) { + authorizer.addAcl(entry.getKey(), entry.getValue()); + } + } + } + + @Override + public synchronized CompletableFuture<List<AclCreateResult>> createAcls( + ControllerRequestContext context, + List<AclBinding> aclBindings + ) { + Map<Uuid, StandardAcl> prevIdToAcl = new HashMap<>(aclControl.idToAcl()); + ControllerResult<List<AclCreateResult>> result = aclControl.createAcls(aclBindings); + RecordTestUtils.replayAll(aclControl, result.records()); + syncIdToAcl(prevIdToAcl, aclControl.idToAcl()); + return CompletableFuture.completedFuture(result.response()); + } + + @Override + public synchronized CompletableFuture<List<AclDeleteResult>> deleteAcls( + ControllerRequestContext context, + List<AclBindingFilter> aclBindingFilters + ) { + Map<Uuid, StandardAcl> prevIdToAcl = new HashMap<>(aclControl.idToAcl()); + ControllerResult<List<AclDeleteResult>> result = aclControl.deleteAcls(aclBindingFilters); + RecordTestUtils.replayAll(aclControl, result.records()); + syncIdToAcl(prevIdToAcl, aclControl.idToAcl()); + return CompletableFuture.completedFuture(result.response()); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 261d3c91b28..bdf5e8f86e5 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -979,44 +979,6 @@ public class QuorumControllerTest { private static final Map<Integer, Long> ALL_ZERO_BROKER_EPOCHS = IntStream.of(0, 1, 2, 3).boxed().collect(Collectors.toMap(identity(), __ -> 0L)); - @Test - public void testQuorumControllerCompletesAuthorizerInitialLoad() throws Throwable { - final int numControllers = 3; - List<StandardAuthorizer> authorizers = new ArrayList<>(numControllers); - for (int i = 0; i < numControllers; i++) { - StandardAuthorizer authorizer = new StandardAuthorizer(); - authorizer.configure(Collections.emptyMap()); - authorizers.add(authorizer); - } - try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(numControllers). - setSharedLogDataInitializer(sharedLogData -> { - sharedLogData.setInitialMaxReadOffset(2); - }). - build() - ) { - logEnv.appendInitialRecords(generateTestRecords(FOO_ID, ALL_ZERO_BROKER_EPOCHS)); - logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2)); - try ( - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). - setControllerBuilderInitializer(controllerBuilder -> { - controllerBuilder.setAuthorizer(authorizers.get(controllerBuilder.nodeId())); - }). - build() - ) { - assertInitialLoadFuturesNotComplete(authorizers); - logEnv.logManagers().get(0).setMaxReadOffset(Long.MAX_VALUE); - QuorumController active = controlEnv.activeController(); - active.unregisterBroker(ANONYMOUS_CONTEXT, 3).get(); - assertInitialLoadFuturesNotComplete(authorizers.stream().skip(1).collect(Collectors.toList())); - logEnv.logManagers().forEach(m -> m.setMaxReadOffset(Long.MAX_VALUE)); - TestUtils.waitForCondition(() -> { - return authorizers.stream().allMatch(a -> a.initialLoadFuture().isDone()); - }, "Failed to complete initial authorizer load for all controllers."); - } - } - } - @Test public void testFatalMetadataReplayErrorOnActive() throws Throwable { try ( diff --git a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java index a1dc2ce4fbb..c39682c940f 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java @@ -39,7 +39,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; @@ -70,17 +69,10 @@ public class RecordTestUtils { try { Method method = target.getClass().getMethod("replay", record.getClass(), - Optional.class); - method.invoke(target, record, Optional.empty()); - } catch (NoSuchMethodException t) { - try { - Method method = target.getClass().getMethod("replay", - record.getClass(), - long.class); - method.invoke(target, record, 0L); - } catch (NoSuchMethodException i) { - // ignore - } + long.class); + method.invoke(target, record, 0L); + } catch (NoSuchMethodException i) { + // ignore } } } catch (InvocationTargetException e) { diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java deleted file mode 100644 index 188a8dc69cc..00000000000 --- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.metadata.authorizer; - -import org.apache.kafka.common.acl.AclBinding; -import org.apache.kafka.common.acl.AclBindingFilter; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.controller.ControllerRequestContext; -import org.apache.kafka.controller.MockAclControlManager; -import org.apache.kafka.server.authorizer.AclCreateResult; -import org.apache.kafka.server.authorizer.AclDeleteResult; - -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -public class MockAclMutator implements AclMutator { - MockAclControlManager aclControlManager; - - public MockAclMutator(StandardAuthorizer authorizer) { - aclControlManager = createAclControlManager(authorizer); - } - - private MockAclControlManager createAclControlManager(StandardAuthorizer standardAuthorizer) { - LogContext logContext = new LogContext(); - return new MockAclControlManager(logContext, Optional.of(standardAuthorizer)); - } - - @Override - public CompletableFuture<List<AclCreateResult>> createAcls( - ControllerRequestContext context, - List<AclBinding> aclBindings - ) { - CompletableFuture<List<AclCreateResult>> future = new CompletableFuture<>(); - future.complete(aclControlManager.createAndReplayAcls(aclBindings)); - return future; - } - - @Override - public CompletableFuture<List<AclDeleteResult>> deleteAcls( - ControllerRequestContext context, - List<AclBindingFilter> aclBindingFilters - ) { - CompletableFuture<List<AclDeleteResult>> future = new CompletableFuture<>(); - future.complete(aclControlManager.deleteAndReplayAcls(aclBindingFilters)); - return future; - } -} \ No newline at end of file