This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9318b591d7a KAFKA-15318: Update the Authorizer via AclPublisher 
(#14169)
9318b591d7a is described below

commit 9318b591d7a57b9db1e7519986d78f0402cd5b5e
Author: Colin Patrick McCabe <cmcc...@apache.org>
AuthorDate: Wed Aug 9 23:54:46 2023 -0700

    KAFKA-15318: Update the Authorizer via AclPublisher (#14169)
    
    On the controller, move publishing acls to the Authorizer into a dedicated 
MetadataPublisher,
    AclPublisher. This publisher listens for notifications from MetadataLoader, 
and receives only
    committed data. This brings the controller side in line with how the broker 
has always worked. It
    also avoids some ugly code related to publishing directly from the 
QuorumController. Most important
    of all, it clears the way to implement metadata transactions without 
worrying about Authorizer
    state (since it will be handled by the MetadataLoader, along with other 
metadata image state).
    
    In AclsDelta, we can remove isSnapshotDelta. We always know when the 
MetadataLoader is giving us a
    snapshot. Also bring AclsDelta in line with the other delta classes, where 
completeSnapshot
    calculates the diff between the previous image and the next one. We don't 
use this delta (since we
    just apply the image directly to the authorizer) but we should have it, for 
consistency.
    
    Finally, change MockAclMutator to avoid the need to subclass 
AclControlManager.
    
    Reviewers: David Arthur <mum...@gmail.com>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |  18 ++--
 .../main/scala/kafka/server/ControllerServer.scala |  19 +++-
 .../scala/kafka/server/metadata/AclPublisher.scala | 102 +++++++++++++++++++++
 .../server/metadata/BrokerMetadataPublisher.scala  |  43 +--------
 .../kafka/security/authorizer/AuthorizerTest.scala |   3 +-
 .../metadata/BrokerMetadataPublisherTest.scala     |   2 +-
 .../apache/kafka/controller/AclControlManager.java |  49 ++--------
 .../apache/kafka/controller/QuorumController.java  |  61 +-----------
 .../java/org/apache/kafka/image/AclsDelta.java     |  27 +++---
 .../kafka/controller/AclControlManagerTest.java    |   6 +-
 .../kafka/controller/MockAclControlManager.java    |  50 ----------
 .../apache/kafka/controller/MockAclMutator.java    |  92 +++++++++++++++++++
 .../kafka/controller/QuorumControllerTest.java     |  38 --------
 .../org/apache/kafka/metadata/RecordTestUtils.java |  16 +---
 .../kafka/metadata/authorizer/MockAclMutator.java  |  62 -------------
 15 files changed, 252 insertions(+), 336 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 825d9eb8c19..47f4fee59bf 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -25,7 +25,7 @@ import kafka.log.remote.RemoteLogManager
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.KafkaRaftManager
 import kafka.security.CredentialProvider
-import kafka.server.metadata.{BrokerMetadataPublisher, 
ClientQuotaMetadataManager, DynamicClientQuotaPublisher, 
DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
+import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, 
ClientQuotaMetadataManager, DynamicClientQuotaPublisher, 
DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.NetworkClient
 import org.apache.kafka.common.config.ConfigException
@@ -42,7 +42,6 @@ import org.apache.kafka.coordinator.group
 import org.apache.kafka.coordinator.group.util.SystemTimerReaper
 import org.apache.kafka.coordinator.group.{GroupCoordinator, 
GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde}
 import org.apache.kafka.image.publisher.MetadataPublisher
-import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.metadata.{BrokerState, VersionRange}
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.authorizer.Authorizer
@@ -419,7 +418,12 @@ class BrokerServer(
           sharedServer.metadataPublishingFaultHandler,
           "broker",
           credentialProvider),
-        authorizer,
+        new AclPublisher(
+          config.nodeId,
+          sharedServer.metadataPublishingFaultHandler,
+          "broker",
+          authorizer
+        ),
         sharedServer.initialBrokerMetadataLoadFaultHandler,
         sharedServer.metadataPublishingFaultHandler)
       metadataPublishers.add(brokerMetadataPublisher)
@@ -468,14 +472,6 @@ class BrokerServer(
         rlm.startup()
       })
 
-      // If we are using a ClusterMetadataAuthorizer which stores its ACLs in 
the metadata log,
-      // notify it that the loading process is complete.
-      authorizer match {
-        case Some(clusterMetadataAuthorizer: ClusterMetadataAuthorizer) =>
-          clusterMetadataAuthorizer.completeInitialLoad()
-        case _ => // nothing to do
-      }
-
       // We're now ready to unfence the broker. This also allows this broker 
to transition
       // from RECOVERY state to RUNNING state, once the controller unfences 
the broker.
       FutureUtils.waitWithLogging(logger.underlying, logIdent,
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 3a98fbaf587..78045a6985d 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -26,7 +26,7 @@ import 
kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPoli
 import kafka.server.QuotaFactory.QuotaManagers
 
 import scala.collection.immutable
-import kafka.server.metadata.{ClientQuotaMetadataManager, 
DynamicClientQuotaPublisher, DynamicConfigPublisher, ScramPublisher}
+import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, 
DynamicClientQuotaPublisher, DynamicConfigPublisher, ScramPublisher}
 import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
 import kafka.zk.{KafkaZkClient, ZkMigrationClient}
 import org.apache.kafka.common.config.ConfigException
@@ -238,11 +238,14 @@ class ControllerServer(
           
setNonFatalFaultHandler(sharedServer.nonFatalQuorumControllerFaultHandler).
           setZkMigrationEnabled(config.migrationEnabled)
       }
+      controller = controllerBuilder.build()
+
+      // If we are using a ClusterMetadataAuthorizer, requests to add or 
remove ACLs must go
+      // through the controller.
       authorizer match {
-        case Some(a: ClusterMetadataAuthorizer) => 
controllerBuilder.setAuthorizer(a)
-        case _ => // nothing to do
+        case Some(a: ClusterMetadataAuthorizer) => a.setAclMutator(controller)
+        case _ =>
       }
-      controller = controllerBuilder.build()
 
       if (config.migrationEnabled) {
         val zkClient = KafkaZkClient.createZkClient("KRaft Migration", time, 
config, KafkaServer.zkClientConfigFromKafkaConfig(config))
@@ -361,6 +364,14 @@ class ControllerServer(
         sharedServer.metadataPublishingFaultHandler
       ))
 
+      // Set up the ACL publisher.
+      metadataPublishers.add(new AclPublisher(
+        config.nodeId,
+        sharedServer.metadataPublishingFaultHandler,
+        "controller",
+        authorizer
+      ))
+
       // Install all metadata publishers.
       FutureUtils.waitWithLogging(logger.underlying, logIdent,
         "the controller metadata publishers to be installed",
diff --git a/core/src/main/scala/kafka/server/metadata/AclPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/AclPublisher.scala
new file mode 100644
index 00000000000..819fcc3d38d
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/AclPublisher.scala
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.utils.Logging
+import org.apache.kafka.image.loader.{LoaderManifest, LoaderManifestType}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
+import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.fault.FaultHandler
+
+import scala.concurrent.TimeoutException
+
+
+class AclPublisher(
+  nodeId: Int,
+  faultHandler: FaultHandler,
+  nodeType: String,
+  authorizer: Option[Authorizer],
+) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
+  logIdent = s"[${name()}] "
+
+  override def name(): String = s"AclPublisher ${nodeType} id=${nodeId}"
+
+  var completedInitialLoad = false
+
+  override def onMetadataUpdate(
+    delta: MetadataDelta,
+    newImage: MetadataImage,
+    manifest: LoaderManifest
+  ): Unit = {
+    val deltaName = s"MetadataDelta up to ${newImage.offset()}"
+
+    // Apply changes to ACLs. This needs to be handled carefully because while 
we are
+    // applying these changes, the Authorizer is continuing to return 
authorization
+    // results in other threads. We never want to expose an invalid state. For 
example,
+    // if the user created a DENY ALL acl and then created an ALLOW ACL for 
topic foo,
+    // we want to apply those changes in that order, not the reverse order! 
Otherwise
+    // there could be a window during which incorrect authorization results 
are returned.
+    Option(delta.aclsDelta()).foreach { aclsDelta =>
+      authorizer match {
+        case Some(authorizer: ClusterMetadataAuthorizer) => if 
(manifest.`type`().equals(LoaderManifestType.SNAPSHOT)) {
+          try {
+            // If the delta resulted from a snapshot load, we want to apply 
the new changes
+            // all at once using ClusterMetadataAuthorizer#loadSnapshot. If 
this is the
+            // first snapshot load, it will also complete the futures returned 
by
+            // Authorizer#start (which we wait for before processing RPCs).
+            info(s"Loading authorizer snapshot at offset ${newImage.offset()}")
+            authorizer.loadSnapshot(newImage.acls().acls())
+          } catch {
+            case t: Throwable => faultHandler.handleFault("Error loading " +
+              s"authorizer snapshot in $deltaName", t)
+          }
+        } else {
+          try {
+            // Because the changes map is a LinkedHashMap, the deltas will be 
returned in
+            // the order they were performed.
+            aclsDelta.changes().entrySet().forEach(e =>
+              if (e.getValue.isPresent) {
+                authorizer.addAcl(e.getKey, e.getValue.get())
+              } else {
+                authorizer.removeAcl(e.getKey)
+              })
+          } catch {
+            case t: Throwable => faultHandler.handleFault("Error loading " +
+              s"authorizer changes in $deltaName", t)
+          }
+        }
+        if (!completedInitialLoad) {
+          // If we are receiving this onMetadataUpdate call, that means the 
MetadataLoader has
+          // loaded up to the local high water mark. So we complete the 
initial load, enabling
+          // the authorizer.
+          completedInitialLoad = true
+          authorizer.completeInitialLoad()
+        }
+        case _ => // No ClusterMetadataAuthorizer is configured. There is 
nothing to do.
+      }
+    }
+  }
+
+  override def close(): Unit = {
+    authorizer match {
+      case Some(authorizer: ClusterMetadataAuthorizer) => 
authorizer.completeInitialLoad(new TimeoutException)
+      case _ =>
+    }
+  }
+}
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index e1bf2e89607..5cfc40a2dd8 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -29,8 +29,6 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.image.loader.LoaderManifest
 import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, 
TopicsImage}
-import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
-import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.fault.FaultHandler
 
 import java.util.concurrent.CompletableFuture
@@ -107,7 +105,7 @@ class BrokerMetadataPublisher(
   var dynamicConfigPublisher: DynamicConfigPublisher,
   dynamicClientQuotaPublisher: DynamicClientQuotaPublisher,
   scramPublisher: ScramPublisher,
-  private val _authorizer: Option[Authorizer],
+  aclPublisher: AclPublisher,
   fatalFaultHandler: FaultHandler,
   metadataPublishingFaultHandler: FaultHandler
 ) extends MetadataPublisher with Logging {
@@ -229,43 +227,8 @@ class BrokerMetadataPublisher(
       // Apply SCRAM delta.
       scramPublisher.onMetadataUpdate(delta, newImage)
 
-      // Apply changes to ACLs. This needs to be handled carefully because 
while we are
-      // applying these changes, the Authorizer is continuing to return 
authorization
-      // results in other threads. We never want to expose an invalid state. 
For example,
-      // if the user created a DENY ALL acl and then created an ALLOW ACL for 
topic foo,
-      // we want to apply those changes in that order, not the reverse order! 
Otherwise
-      // there could be a window during which incorrect authorization results 
are returned.
-      Option(delta.aclsDelta()).foreach { aclsDelta =>
-        _authorizer match {
-          case Some(authorizer: ClusterMetadataAuthorizer) => if 
(aclsDelta.isSnapshotDelta) {
-            try {
-              // If the delta resulted from a snapshot load, we want to apply 
the new changes
-              // all at once using ClusterMetadataAuthorizer#loadSnapshot. If 
this is the
-              // first snapshot load, it will also complete the futures 
returned by
-              // Authorizer#start (which we wait for before processing RPCs).
-              authorizer.loadSnapshot(newImage.acls().acls())
-            } catch {
-              case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error loading " +
-                s"authorizer snapshot in $deltaName", t)
-            }
-          } else {
-            try {
-              // Because the changes map is a LinkedHashMap, the deltas will 
be returned in
-              // the order they were performed.
-              aclsDelta.changes().entrySet().forEach(e =>
-                if (e.getValue.isPresent) {
-                  authorizer.addAcl(e.getKey, e.getValue.get())
-                } else {
-                  authorizer.removeAcl(e.getKey)
-                })
-            } catch {
-              case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error loading " +
-                s"authorizer changes in $deltaName", t)
-            }
-          }
-          case _ => // No ClusterMetadataAuthorizer is configured. There is 
nothing to do.
-        }
-      }
+      // Apply ACL delta.
+      aclPublisher.onMetadataUpdate(delta, newImage, manifest)
 
       try {
         // Propagate the new image to the group coordinator.
diff --git 
a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
index 76bcdeb41af..06244de7dee 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
@@ -35,8 +35,9 @@ import org.apache.kafka.common.resource.ResourceType._
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourcePatternFilter, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils}
+import org.apache.kafka.controller.MockAclMutator
 import 
org.apache.kafka.metadata.authorizer.StandardAuthorizerTest.AuthorizerTestServerInfo
-import org.apache.kafka.metadata.authorizer.{MockAclMutator, 
StandardAuthorizer}
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_2_0_IV0, 
IBP_2_0_IV1}
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 766fba282db..bbffe5c51ea 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -289,7 +289,7 @@ class BrokerMetadataPublisherTest {
       mock(classOf[DynamicConfigPublisher]),
       mock(classOf[DynamicClientQuotaPublisher]),
       mock(classOf[ScramPublisher]),
-      None,
+      mock(classOf[AclPublisher]),
       faultHandler,
       faultHandler
     )
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
index ff9101e356d..7cd6a948ab8 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
@@ -27,10 +27,8 @@ import 
org.apache.kafka.common.metadata.AccessControlEntryRecord;
 import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
 import org.apache.kafka.metadata.authorizer.StandardAcl;
 import org.apache.kafka.metadata.authorizer.StandardAclWithId;
-import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.server.authorizer.AclCreateResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult;
 import 
org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult;
@@ -48,7 +46,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -58,22 +55,12 @@ import static 
org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_
 /**
  * The AclControlManager manages any ACLs that are stored in the 
__cluster_metadata topic.
  * If the ACLs are stored externally (such as in ZooKeeper) then there will be 
nothing for
- * this manager to do, and the authorizer field will always be Optional.empty.
- *
- * Because the Authorizer is being concurrently used by other threads, we need 
to be
- * careful about snapshots. We don't want the Authorizer to act based on 
partial state
- * during the loading process. Therefore, unlike most of the other managers,
- * AclControlManager needs to receive callbacks when we start loading a 
snapshot and when
- * we finish. The prepareForSnapshotLoad callback clears the authorizer field, 
preventing
- * any changes from affecting the authorizer until completeSnapshotLoad is 
called.
- * Note that the Authorizer's start() method will block until the first 
snapshot load has
- * completed, which is another reason the prepare / complete callbacks are 
needed.
+ * this manager to do.
  */
 public class AclControlManager {
     static class Builder {
         private LogContext logContext = null;
         private SnapshotRegistry snapshotRegistry = null;
-        private Optional<ClusterMetadataAuthorizer> authorizer = 
Optional.empty();
 
         Builder setLogContext(LogContext logContext) {
             this.logContext = logContext;
@@ -85,32 +72,24 @@ public class AclControlManager {
             return this;
         }
 
-        Builder 
setClusterMetadataAuthorizer(Optional<ClusterMetadataAuthorizer> authorizer) {
-            this.authorizer = authorizer;
-            return this;
-        }
-
         AclControlManager build() {
             if (logContext == null) logContext = new LogContext();
             if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
-            return new AclControlManager(logContext, snapshotRegistry, 
authorizer);
+            return new AclControlManager(logContext, snapshotRegistry);
         }
     }
 
     private final Logger log;
     private final TimelineHashMap<Uuid, StandardAcl> idToAcl;
     private final TimelineHashSet<StandardAcl> existingAcls;
-    private final Optional<ClusterMetadataAuthorizer> authorizer;
 
-    AclControlManager(
+    private AclControlManager(
         LogContext logContext,
-        SnapshotRegistry snapshotRegistry,
-        Optional<ClusterMetadataAuthorizer> authorizer
+        SnapshotRegistry snapshotRegistry
     ) {
         this.log = logContext.logger(AclControlManager.class);
         this.idToAcl = new TimelineHashMap<>(snapshotRegistry, 0);
         this.existingAcls = new TimelineHashSet<>(snapshotRegistry, 0);
-        this.authorizer = authorizer;
     }
 
     ControllerResult<List<AclCreateResult>> createAcls(List<AclBinding> acls) {
@@ -227,26 +206,15 @@ public class AclControlManager {
         }
     }
 
-    public void replay(
-        AccessControlEntryRecord record,
-        Optional<OffsetAndEpoch> snapshotId
-    ) {
+    public void replay(AccessControlEntryRecord record) {
         StandardAclWithId aclWithId = StandardAclWithId.fromRecord(record);
         idToAcl.put(aclWithId.id(), aclWithId.acl());
         existingAcls.add(aclWithId.acl());
-        if (!snapshotId.isPresent()) {
-            authorizer.ifPresent(a -> {
-                a.addAcl(aclWithId.id(), aclWithId.acl());
-            });
-        }
         log.info("Replayed AccessControlEntryRecord for {}, setting {}", 
record.id(),
                 aclWithId.acl());
     }
 
-    public void replay(
-        RemoveAccessControlEntryRecord record,
-        Optional<OffsetAndEpoch> snapshotId
-    ) {
+    public void replay(RemoveAccessControlEntryRecord record) {
         StandardAcl acl = idToAcl.remove(record.id());
         if (acl == null) {
             throw new RuntimeException("Unable to replay " + record + ": no 
acl with " +
@@ -256,11 +224,6 @@ public class AclControlManager {
             throw new RuntimeException("Unable to replay " + record + " for " 
+ acl +
                 ": acl not found " + "in existingAcls.");
         }
-        if (!snapshotId.isPresent()) {
-            authorizer.ifPresent(a -> {
-                a.removeAcl(record.id());
-            });
-        }
         log.info("Replayed RemoveAccessControlEntryRecord for {}, removing 
{}", record.id(), acl);
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 5e55ffa70c4..b658af4c111 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -83,7 +83,6 @@ import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.KafkaConfigSchema;
-import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metadata.migration.ZkMigrationState;
 import org.apache.kafka.metadata.migration.ZkRecordConsumer;
@@ -195,7 +194,6 @@ public final class QuorumController implements Controller {
         private Optional<CreateTopicPolicy> createTopicPolicy = 
Optional.empty();
         private Optional<AlterConfigPolicy> alterConfigPolicy = 
Optional.empty();
         private ConfigurationValidator configurationValidator = 
ConfigurationValidator.NO_OP;
-        private Optional<ClusterMetadataAuthorizer> authorizer = 
Optional.empty();
         private Map<String, Object> staticConfig = Collections.emptyMap();
         private BootstrapMetadata bootstrapMetadata = null;
         private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
@@ -310,11 +308,6 @@ public final class QuorumController implements Controller {
             return this;
         }
 
-        public Builder setAuthorizer(ClusterMetadataAuthorizer authorizer) {
-            this.authorizer = Optional.of(authorizer);
-            return this;
-        }
-
         public Builder setStaticConfig(Map<String, Object> staticConfig) {
             this.staticConfig = staticConfig;
             return this;
@@ -373,7 +366,6 @@ public final class QuorumController implements Controller {
                     createTopicPolicy,
                     alterConfigPolicy,
                     configurationValidator,
-                    authorizer,
                     staticConfig,
                     bootstrapMetadata,
                     maxRecordsPerBatch,
@@ -965,7 +957,6 @@ public final class QuorumController implements Controller {
         public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
             appendRaftEvent("handleCommit[baseOffset=" + reader.baseOffset() + 
"]", () -> {
                 try {
-                    maybeCompleteAuthorizerInitialLoad();
                     boolean isActive = isActiveController();
                     while (reader.hasNext()) {
                         Batch<ApiMessageAndVersion> batch = reader.next();
@@ -1063,7 +1054,6 @@ public final class QuorumController implements Controller 
{
                         reader.lastContainedLogEpoch(),
                         reader.lastContainedLogTimestamp());
                     snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
-                    authorizer.ifPresent(a -> 
a.loadSnapshot(aclControlManager.idToAcl()));
                 } finally {
                     reader.close();
                 }
@@ -1112,35 +1102,12 @@ public final class QuorumController implements 
Controller {
                 if (this != metaLogListener) {
                     log.debug("Ignoring {} raft event from an old 
registration", name);
                 } else {
-                    try {
-                        runnable.run();
-                    } finally {
-                        maybeCompleteAuthorizerInitialLoad();
-                    }
+                    runnable.run();
                 }
             });
         }
     }
 
-    private void maybeCompleteAuthorizerInitialLoad() {
-        if (!needToCompleteAuthorizerLoad) return;
-        OptionalLong highWatermark = raftClient.highWatermark();
-        if (highWatermark.isPresent()) {
-            if (lastCommittedOffset + 1 >= highWatermark.getAsLong()) {
-                log.info("maybeCompleteAuthorizerInitialLoad: completing 
authorizer " +
-                    "initial load at last committed offset {}.", 
lastCommittedOffset);
-                authorizer.get().completeInitialLoad();
-                needToCompleteAuthorizerLoad = false;
-            } else {
-                log.trace("maybeCompleteAuthorizerInitialLoad: can't proceed 
because " +
-                    "lastCommittedOffset  = {}, but highWatermark = {}.",
-                    lastCommittedOffset, highWatermark.getAsLong());
-            }
-        } else {
-            log.trace("maybeCompleteAuthorizerInitialLoad: highWatermark not 
set.");
-        }
-    }
-
     private boolean isActiveController() {
         return isActiveController(curClaimEpoch);
     }
@@ -1342,7 +1309,6 @@ public final class QuorumController implements Controller 
{
                         lastCommittedEpoch + " in snapshot registry.");
             }
             snapshotRegistry.revertToSnapshot(lastCommittedOffset);
-            authorizer.ifPresent(a -> 
a.loadSnapshot(aclControlManager.idToAcl()));
             updateWriteOffset(-1);
             clusterControl.deactivate();
             cancelMaybeFenceReplicas();
@@ -1572,10 +1538,10 @@ public final class QuorumController implements 
Controller {
                 clusterControl.replay((BrokerRegistrationChangeRecord) 
message);
                 break;
             case ACCESS_CONTROL_ENTRY_RECORD:
-                aclControlManager.replay((AccessControlEntryRecord) message, 
snapshotId);
+                aclControlManager.replay((AccessControlEntryRecord) message);
                 break;
             case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
-                aclControlManager.replay((RemoveAccessControlEntryRecord) 
message, snapshotId);
+                aclControlManager.replay((RemoveAccessControlEntryRecord) 
message);
                 break;
             case USER_SCRAM_CREDENTIAL_RECORD:
                 scramControlManager.replay((UserScramCredentialRecord) 
message);
@@ -1701,12 +1667,6 @@ public final class QuorumController implements 
Controller {
      */
     private final ScramControlManager scramControlManager;
 
-    /**
-     * The ClusterMetadataAuthorizer, if one is configured. Note that this 
will still be
-     * Optional.empty() if an Authorizer is configured that doesn't use 
__cluster_metadata.
-     */
-    private final Optional<ClusterMetadataAuthorizer> authorizer;
-
     /**
      * Manages the standard ACLs in the cluster.
      * This must be accessed only by the event queue thread.
@@ -1754,12 +1714,6 @@ public final class QuorumController implements 
Controller {
      */
     private long lastCommittedTimestamp = -1;
 
-    /**
-     * True if we need to complete the authorizer initial load.
-     * This must be accessed only by the event queue thread.
-     */
-    private boolean needToCompleteAuthorizerLoad;
-
     /**
      * If we have called scheduleWrite, this is the last offset we got back 
from it.
      */
@@ -1834,7 +1788,6 @@ public final class QuorumController implements Controller 
{
         Optional<CreateTopicPolicy> createTopicPolicy,
         Optional<AlterConfigPolicy> alterConfigPolicy,
         ConfigurationValidator configurationValidator,
-        Optional<ClusterMetadataAuthorizer> authorizer,
         Map<String, Object> staticConfig,
         BootstrapMetadata bootstrapMetadata,
         int maxRecordsPerBatch,
@@ -1908,12 +1861,9 @@ public final class QuorumController implements 
Controller {
             setLogContext(logContext).
             setSnapshotRegistry(snapshotRegistry).
             build();
-        this.authorizer = authorizer;
-        authorizer.ifPresent(a -> a.setAclMutator(this));
         this.aclControlManager = new AclControlManager.Builder().
             setLogContext(logContext).
             setSnapshotRegistry(snapshotRegistry).
-            setClusterMetadataAuthorizer(authorizer).
             build();
         this.logReplayTracker = new LogReplayTracker.Builder().
             setLogContext(logContext).
@@ -1923,7 +1873,6 @@ public final class QuorumController implements Controller 
{
         this.maxRecordsPerBatch = maxRecordsPerBatch;
         this.metaLogListener = new QuorumMetaLogListener();
         this.curClaimEpoch = -1;
-        this.needToCompleteAuthorizerLoad = authorizer.isPresent();
         this.zkRecordConsumer = new MigrationRecordConsumer();
         this.zkMigrationEnabled = zkMigrationEnabled;
         this.recordRedactor = new RecordRedactor(configSchema);
@@ -1931,8 +1880,8 @@ public final class QuorumController implements Controller 
{
 
         resetToEmptyState();
 
-        log.info("Creating new QuorumController with clusterId {}, authorizer 
{}.{}",
-                clusterId, authorizer, zkMigrationEnabled ? " ZK migration 
mode is enabled." : "");
+        log.info("Creating new QuorumController with clusterId {}.{}",
+                clusterId, zkMigrationEnabled ? " ZK migration mode is 
enabled." : "");
 
         this.raftClient.register(metaLogListener);
     }
diff --git a/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java
index cf2bb75ddbf..15e9a69c193 100644
--- a/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java
@@ -41,7 +41,6 @@ public final class AclsDelta {
     private final AclsImage image;
     private final Map<Uuid, Optional<StandardAcl>> changes = new 
LinkedHashMap<>();
     private final Set<StandardAcl> deleted = new HashSet<>();
-    private boolean isSnapshotDelta = false;
 
     public AclsDelta(AclsImage image) {
         this.image = image;
@@ -67,17 +66,17 @@ public final class AclsDelta {
     }
 
     void finishSnapshot() {
-        this.isSnapshotDelta = true;
+        for (Entry<Uuid, StandardAcl> entry : image.acls().entrySet()) {
+            if (!changes.containsKey(entry.getKey())) {
+                changes.put(entry.getKey(), Optional.empty());
+            }
+        }
     }
 
     public void handleMetadataVersionChange(MetadataVersion newVersion) {
         // no-op
     }
 
-    public boolean isSnapshotDelta() {
-        return isSnapshotDelta;
-    }
-
     public void replay(AccessControlEntryRecord record) {
         StandardAclWithId aclWithId = StandardAclWithId.fromRecord(record);
         changes.put(aclWithId.id(), Optional.of(aclWithId.acl()));
@@ -105,14 +104,12 @@ public final class AclsDelta {
 
     public AclsImage apply() {
         Map<Uuid, StandardAcl> newAcls = new HashMap<>();
-        if (!isSnapshotDelta) {
-            for (Entry<Uuid, StandardAcl> entry : image.acls().entrySet()) {
-                Optional<StandardAcl> change = changes.get(entry.getKey());
-                if (change == null) {
-                    newAcls.put(entry.getKey(), entry.getValue());
-                } else if (change.isPresent()) {
-                    newAcls.put(entry.getKey(), change.get());
-                }
+        for (Entry<Uuid, StandardAcl> entry : image.acls().entrySet()) {
+            Optional<StandardAcl> change = changes.get(entry.getKey());
+            if (change == null) {
+                newAcls.put(entry.getKey(), entry.getValue());
+            } else if (change.isPresent()) {
+                newAcls.put(entry.getKey(), change.get());
             }
         }
         for (Entry<Uuid, Optional<StandardAcl>> entry : changes.entrySet()) {
@@ -127,7 +124,7 @@ public final class AclsDelta {
 
     @Override
     public String toString() {
-        return "AclsDelta(isSnapshotDelta=" + isSnapshotDelta +
+        return "AclsDelta(" +
             ", changes=" + changes.entrySet().stream().
                 map(e -> "" + e.getKey() + "=" + e.getValue()).
                 collect(Collectors.joining(", ")) + ")";
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
index 0806455dcf3..dd6c2d15185 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
@@ -212,7 +212,7 @@ public class AclControlManagerTest {
         for (StandardAclWithId acl : TEST_ACLS) {
             AccessControlEntryRecord record = acl.toRecord();
             assertTrue(loadedAcls.add(new ApiMessageAndVersion(record, (short) 
0)));
-            manager.replay(acl.toRecord(), Optional.empty());
+            manager.replay(acl.toRecord());
         }
 
         // Verify that the ACLs stored in the AclControlManager match the ones 
we expect.
@@ -241,9 +241,9 @@ public class AclControlManagerTest {
         AclControlManager manager = new AclControlManager.Builder().build();
         MockClusterMetadataAuthorizer authorizer = new 
MockClusterMetadataAuthorizer();
         authorizer.loadSnapshot(manager.idToAcl());
-        manager.replay(StandardAclWithIdTest.TEST_ACLS.get(0).toRecord(), 
Optional.empty());
+        manager.replay(StandardAclWithIdTest.TEST_ACLS.get(0).toRecord());
         manager.replay(new RemoveAccessControlEntryRecord().
-            setId(TEST_ACLS.get(0).id()), Optional.empty());
+            setId(TEST_ACLS.get(0).id()));
         assertTrue(manager.idToAcl().isEmpty());
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java 
b/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java
deleted file mode 100644
index e14c8e2e4f5..00000000000
--- 
a/metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.controller;
-
-import org.apache.kafka.common.acl.AclBinding;
-import org.apache.kafka.common.acl.AclBindingFilter;
-import org.apache.kafka.common.metadata.AccessControlEntryRecord;
-import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
-import org.apache.kafka.server.authorizer.AclCreateResult;
-import org.apache.kafka.server.authorizer.AclDeleteResult;
-import org.apache.kafka.timeline.SnapshotRegistry;
-
-import java.util.List;
-import java.util.Optional;
-
-public class MockAclControlManager extends AclControlManager {
-    public MockAclControlManager(LogContext logContext,
-                                 Optional<ClusterMetadataAuthorizer> 
authorizer) {
-        super(new LogContext(), new SnapshotRegistry(logContext), authorizer);
-    }
-
-    public List<AclCreateResult> createAndReplayAcls(List<AclBinding> acls) {
-        ControllerResult<List<AclCreateResult>> createResults = 
createAcls(acls);
-        createResults.records().forEach(record -> 
replay((AccessControlEntryRecord) record.message(), Optional.empty()));
-        return createResults.response();
-    }
-
-    public List<AclDeleteResult> deleteAndReplayAcls(List<AclBindingFilter> 
filters) {
-        ControllerResult<List<AclDeleteResult>> deleteResults = 
deleteAcls(filters);
-        deleteResults.records().forEach(record -> 
replay((RemoveAccessControlEntryRecord) record.message(), Optional.empty()));
-        return deleteResults.response();
-    }
-}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/MockAclMutator.java 
b/metadata/src/test/java/org/apache/kafka/controller/MockAclMutator.java
new file mode 100644
index 00000000000..342d164a232
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockAclMutator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.authorizer.AclMutator;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * The MockAclMutator is a class which connects a StandardAuthorizer up to an 
AclControlManager.
+ * Normally, this connection goes through the QuorumController. However, this 
class just attaches
+ * the two directly, for the purpose of unit testing.
+ */
+public class MockAclMutator implements AclMutator {
+    private final StandardAuthorizer authorizer;
+    private final AclControlManager aclControl;
+
+    public MockAclMutator(
+        StandardAuthorizer authorizer
+    ) {
+        this.authorizer = authorizer;
+        this.aclControl = new AclControlManager.Builder().build();
+    }
+
+    private void syncIdToAcl(
+        Map<Uuid, StandardAcl> prevIdToAcl,
+        Map<Uuid, StandardAcl> nextIdToAcl
+    ) {
+        for (Entry<Uuid, StandardAcl> entry : prevIdToAcl.entrySet()) {
+            if (!entry.getValue().equals(nextIdToAcl.get(entry.getKey()))) {
+                authorizer.removeAcl(entry.getKey());
+            }
+        }
+        for (Entry<Uuid, StandardAcl> entry : nextIdToAcl.entrySet()) {
+            if (!entry.getValue().equals(prevIdToAcl.get(entry.getKey()))) {
+                authorizer.addAcl(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    @Override
+    public synchronized CompletableFuture<List<AclCreateResult>> createAcls(
+        ControllerRequestContext context,
+        List<AclBinding> aclBindings
+    ) {
+        Map<Uuid, StandardAcl> prevIdToAcl = new 
HashMap<>(aclControl.idToAcl());
+        ControllerResult<List<AclCreateResult>> result = 
aclControl.createAcls(aclBindings);
+        RecordTestUtils.replayAll(aclControl, result.records());
+        syncIdToAcl(prevIdToAcl, aclControl.idToAcl());
+        return CompletableFuture.completedFuture(result.response());
+    }
+
+    @Override
+    public synchronized CompletableFuture<List<AclDeleteResult>> deleteAcls(
+        ControllerRequestContext context,
+        List<AclBindingFilter> aclBindingFilters
+    ) {
+        Map<Uuid, StandardAcl> prevIdToAcl = new 
HashMap<>(aclControl.idToAcl());
+        ControllerResult<List<AclDeleteResult>> result = 
aclControl.deleteAcls(aclBindingFilters);
+        RecordTestUtils.replayAll(aclControl, result.records());
+        syncIdToAcl(prevIdToAcl, aclControl.idToAcl());
+        return CompletableFuture.completedFuture(result.response());
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 261d3c91b28..bdf5e8f86e5 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -979,44 +979,6 @@ public class QuorumControllerTest {
     private static final Map<Integer, Long> ALL_ZERO_BROKER_EPOCHS =
         IntStream.of(0, 1, 2, 3).boxed().collect(Collectors.toMap(identity(), 
__ -> 0L));
 
-    @Test
-    public void testQuorumControllerCompletesAuthorizerInitialLoad() throws 
Throwable {
-        final int numControllers = 3;
-        List<StandardAuthorizer> authorizers = new ArrayList<>(numControllers);
-        for (int i = 0; i < numControllers; i++) {
-            StandardAuthorizer authorizer = new StandardAuthorizer();
-            authorizer.configure(Collections.emptyMap());
-            authorizers.add(authorizer);
-        }
-        try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(numControllers).
-                setSharedLogDataInitializer(sharedLogData -> {
-                    sharedLogData.setInitialMaxReadOffset(2);
-                }).
-                build()
-        ) {
-            logEnv.appendInitialRecords(generateTestRecords(FOO_ID, 
ALL_ZERO_BROKER_EPOCHS));
-            logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2));
-            try (
-                QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
-                    setControllerBuilderInitializer(controllerBuilder -> {
-                        
controllerBuilder.setAuthorizer(authorizers.get(controllerBuilder.nodeId()));
-                    }).
-                    build()
-            ) {
-                assertInitialLoadFuturesNotComplete(authorizers);
-                logEnv.logManagers().get(0).setMaxReadOffset(Long.MAX_VALUE);
-                QuorumController active = controlEnv.activeController();
-                active.unregisterBroker(ANONYMOUS_CONTEXT, 3).get();
-                
assertInitialLoadFuturesNotComplete(authorizers.stream().skip(1).collect(Collectors.toList()));
-                logEnv.logManagers().forEach(m -> 
m.setMaxReadOffset(Long.MAX_VALUE));
-                TestUtils.waitForCondition(() -> {
-                    return authorizers.stream().allMatch(a -> 
a.initialLoadFuture().isDone());
-                }, "Failed to complete initial authorizer load for all 
controllers.");
-            }
-        }
-    }
-
     @Test
     public void testFatalMetadataReplayErrorOnActive() throws Throwable {
         try (
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java 
b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index a1dc2ce4fbb..c39682c940f 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -39,7 +39,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -70,17 +69,10 @@ public class RecordTestUtils {
                     try {
                         Method method = target.getClass().getMethod("replay",
                             record.getClass(),
-                            Optional.class);
-                        method.invoke(target, record, Optional.empty());
-                    } catch (NoSuchMethodException t) {
-                        try {
-                            Method method = 
target.getClass().getMethod("replay",
-                                record.getClass(),
-                                long.class);
-                            method.invoke(target, record, 0L);
-                        } catch (NoSuchMethodException i) {
-                            // ignore
-                        }
+                            long.class);
+                        method.invoke(target, record, 0L);
+                    } catch (NoSuchMethodException i) {
+                        // ignore
                     }
                 }
             } catch (InvocationTargetException e) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java
deleted file mode 100644
index 188a8dc69cc..00000000000
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/MockAclMutator.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata.authorizer;
-
-import org.apache.kafka.common.acl.AclBinding;
-import org.apache.kafka.common.acl.AclBindingFilter;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.controller.ControllerRequestContext;
-import org.apache.kafka.controller.MockAclControlManager;
-import org.apache.kafka.server.authorizer.AclCreateResult;
-import org.apache.kafka.server.authorizer.AclDeleteResult;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-public class MockAclMutator implements AclMutator {
-    MockAclControlManager aclControlManager;
-
-    public MockAclMutator(StandardAuthorizer authorizer) {
-        aclControlManager = createAclControlManager(authorizer);
-    }
-
-    private MockAclControlManager createAclControlManager(StandardAuthorizer 
standardAuthorizer) {
-        LogContext logContext = new LogContext();
-        return new MockAclControlManager(logContext, 
Optional.of(standardAuthorizer));
-    }
-
-    @Override
-    public CompletableFuture<List<AclCreateResult>> createAcls(
-        ControllerRequestContext context,
-        List<AclBinding> aclBindings
-    ) {
-        CompletableFuture<List<AclCreateResult>> future = new 
CompletableFuture<>();
-        future.complete(aclControlManager.createAndReplayAcls(aclBindings));
-        return future;
-    }
-
-    @Override
-    public CompletableFuture<List<AclDeleteResult>> deleteAcls(
-        ControllerRequestContext context,
-        List<AclBindingFilter> aclBindingFilters
-    ) {
-        CompletableFuture<List<AclDeleteResult>> future = new 
CompletableFuture<>();
-        
future.complete(aclControlManager.deleteAndReplayAcls(aclBindingFilters));
-        return future;
-    }
-}
\ No newline at end of file

Reply via email to