This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fa59be4e770 KAFKA-13649: Implement early.start.listeners and fix 
StandardAuthorizer loading (#11969)
fa59be4e770 is described below

commit fa59be4e770627cd34cef85986b58ad7f606928d
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu May 12 14:48:33 2022 -0700

    KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer 
loading (#11969)
    
    Since the StandardAuthorizer relies on the metadata log to store its ACLs, 
we need to be sure that
    we have the latest metadata before allowing the authorizer to be used. 
However, if the authorizer
    is not usable for controllers in the cluster, the latest metadata cannot be 
fetched, because
    inter-node communication cannot occur. In the initial commit which 
introduced StandardAuthorizer,
    we punted on the loading issue by allowing the authorizer to be used 
immediately. This commit fixes
    that by implementing early.start.listeners as specified in KIP-801. This 
will allow in superusers
    immediately, but throw the new AuthorizerNotReadyException if 
non-superusers try to use the
    authorizer before StandardAuthorizer#completeInitialLoad is called.
    
    For the broker, we call StandardAuthorizer#completeInitialLoad immediately 
after metadata catch-up
    is complete, right before unfencing. For the controller, we call
    StandardAuthorizer#completeInitialLoad when the node has caught up to the 
high water mark of the
    cluster metadata partition.
    
    This PR refactors the SocketServer so that it creates the configured 
acceptors and processors in
    its constructor, rather than requiring a call to SocketServer#startup A new 
function,
    SocketServer#enableRequestProcessing, then starts the threads and begins 
listening on the
    configured ports. enableRequestProcessing uses an async model: we will 
start the acceptor and
    processors associated with an endpoint as soon as that endpoint's 
authorizer future is completed.
    
    Also fix a bug where the controller and listener were sharing an Authorizer 
when in co-located
    mode, which was not intended.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 .../common/errors/AuthorizerNotReadyException.java |  28 ++
 .../server/authorizer/AuthorizerServerInfo.java    |   5 +
 core/src/main/scala/kafka/cluster/Broker.scala     |   6 +-
 core/src/main/scala/kafka/cluster/EndPoint.scala   |   6 +
 .../main/scala/kafka/network/SocketServer.scala    | 533 +++++++++------------
 .../kafka/server/BrokerLifecycleManager.scala      |  10 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  42 +-
 .../main/scala/kafka/server/ControllerServer.scala |  23 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  29 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |   5 +-
 .../main/scala/kafka/tools/TestRaftServer.scala    |   4 +-
 .../unit/kafka/network/SocketServerTest.scala      |  63 +--
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  48 +-
 .../apache/kafka/controller/QuorumController.java  |  40 +-
 .../authorizer/ClusterMetadataAuthorizer.java      |  12 +
 .../metadata/authorizer/StandardAuthorizer.java    |  31 +-
 .../authorizer/StandardAuthorizerData.java         |  24 +
 .../kafka/controller/AclControlManagerTest.java    |  10 +
 .../kafka/controller/QuorumControllerTest.java     |  49 ++
 .../authorizer/ClusterMetadataAuthorizerTest.java  |  10 +
 .../authorizer/StandardAuthorizerTest.java         | 170 ++++++-
 .../org/apache/kafka/metalog/LocalLogManager.java  |  41 +-
 .../kafka/metalog/LocalLogManagerTestEnv.java      |  34 +-
 .../java/org/apache/kafka/raft/RaftClient.java     |   6 +
 24 files changed, 837 insertions(+), 392 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizerNotReadyException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizerNotReadyException.java
new file mode 100644
index 00000000000..1c110ef2143
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizerNotReadyException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * An exception that indicates that the authorizer is not ready to receive the 
request yet.
+ */
+public class AuthorizerNotReadyException extends RetriableException {
+    private static final long serialVersionUID = 1L;
+
+    public AuthorizerNotReadyException() {
+        super();
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizerServerInfo.java
 
b/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizerServerInfo.java
index 51e23fba57f..eb03c117b6c 100644
--- 
a/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizerServerInfo.java
+++ 
b/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizerServerInfo.java
@@ -48,4 +48,9 @@ public interface AuthorizerServerInfo {
      * Returns the inter-broker endpoint. This is one of the endpoints 
returned by {@link #endpoints()}.
      */
     Endpoint interBrokerEndpoint();
+
+    /**
+     * Returns the configured early start listeners.
+     */
+    Collection<String> earlyStartListeners();
 }
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala 
b/core/src/main/scala/kafka/cluster/Broker.scala
index 657d89b8fe7..9b1d741835c 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -35,7 +35,8 @@ object Broker {
   private[kafka] case class ServerInfo(clusterResource: ClusterResource,
                                          brokerId: Int,
                                          endpoints: util.List[Endpoint],
-                                         interBrokerEndpoint: Endpoint) 
extends AuthorizerServerInfo
+                                         interBrokerEndpoint: Endpoint,
+                                         earlyStartListeners: 
util.Set[String]) extends AuthorizerServerInfo
 
   def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = 
{
     new Broker(id, endPoints, rack, emptySupportedFeatures)
@@ -93,6 +94,7 @@ case class Broker(id: Int, endPoints: Seq[EndPoint], rack: 
Option[String], featu
     val clusterResource: ClusterResource = new ClusterResource(clusterId)
     val interBrokerEndpoint: Endpoint = 
endPoint(config.interBrokerListenerName).toJava
     val brokerEndpoints: util.List[Endpoint] = 
endPoints.toList.map(_.toJava).asJava
-    Broker.ServerInfo(clusterResource, id, brokerEndpoints, 
interBrokerEndpoint)
+    Broker.ServerInfo(clusterResource, id, brokerEndpoints, 
interBrokerEndpoint,
+      config.earlyStartListeners.map(_.value()).asJava)
   }
 }
diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala 
b/core/src/main/scala/kafka/cluster/EndPoint.scala
index 3e84f9ed834..89c9f5ec3d4 100644
--- a/core/src/main/scala/kafka/cluster/EndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/EndPoint.scala
@@ -65,6 +65,12 @@ object EndPoint {
       case _ => throw new KafkaException(s"Unable to parse a listener name 
from $connectionString")
     }
   }
+
+  def fromJava(endpoint: JEndpoint): EndPoint =
+    new EndPoint(endpoint.host(),
+      endpoint.port(),
+      new ListenerName(endpoint.listenerName().get()),
+      endpoint.securityProtocol())
 }
 
 /**
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 88dfa15b3f5..e91c240415c 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -22,7 +22,6 @@ import java.net._
 import java.nio.ByteBuffer
 import java.nio.channels.{Selector => NSelector, _}
 import java.util
-import java.util.Optional
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 
@@ -34,7 +33,7 @@ import kafka.network.RequestChannel.{CloseConnectionResponse, 
EndThrottlingRespo
 import kafka.network.SocketServer._
 import kafka.security.CredentialProvider
 import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
-import kafka.utils.Implicits._
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import kafka.utils._
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.config.internals.QuotaConfigs
@@ -104,51 +103,34 @@ class SocketServer(val config: KafkaConfig,
 
   private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
   val connectionQuotas = new ConnectionQuotas(config, time, metrics)
-  private var startedProcessingRequests = false
-  private var stoppedProcessingRequests = false
 
-  // Processors are now created by each Acceptor. However to preserve 
compatibility, we need to number the processors
-  // globally, so we keep the nextProcessorId counter in SocketServer
-  def nextProcessorId(): Int = {
-    nextProcessorId.getAndIncrement()
-  }
+  /**
+   * A future which is completed once all the authorizer futures are complete.
+   */
+  private val allAuthorizerFuturesComplete = new CompletableFuture[Void]
 
   /**
-   * Starts the socket server and creates all the Acceptors and the 
Processors. The Acceptors
-   * start listening at this stage so that the bound port is known when this 
method completes
-   * even when ephemeral ports are used. Acceptors and Processors are started 
if `startProcessingRequests`
-   * is true. If not, acceptors and processors are only started when 
[[kafka.network.SocketServer#startProcessingRequests()]]
-   * is invoked. Delayed starting of acceptors and processors is used to delay 
processing client
-   * connections until server is fully initialized, e.g. to ensure that all 
credentials have been
-   * loaded before authentications are performed. Incoming connections on this 
server are processed
-   * when processors start up and invoke 
[[org.apache.kafka.common.network.Selector#poll]].
-   *
-   * @param startProcessingRequests Flag indicating whether `Processor`s must 
be started.
-   * @param controlPlaneListener    The control plane listener, or None if 
there is none.
-   * @param dataPlaneListeners      The data plane listeners.
+   * True if the SocketServer is stopped. Must be accessed under the 
SocketServer lock.
    */
-  def startup(startProcessingRequests: Boolean = true,
-              controlPlaneListener: Option[EndPoint] = 
config.controlPlaneListener,
-              dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): 
Unit = {
-    this.synchronized {
-      createControlPlaneAcceptorAndProcessor(controlPlaneListener)
-      createDataPlaneAcceptorsAndProcessors(dataPlaneListeners)
-      if (startProcessingRequests) {
-        this.startProcessingRequests()
-      }
-    }
+  private var stopped = false
 
+  // Socket server metrics
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
     val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
-    val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => 
a.processors(0))
-    
newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () 
=> SocketServer.this.synchronized {
-      val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
-        metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-      }
+    val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
+      metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
+    }
+    if (dataPlaneProcessors.isEmpty) {
+      1.0
+    } else {
       ioWaitRatioMetricNames.map { metricName =>
         Option(metrics.metric(metricName)).fold(0.0)(m => 
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
       }.sum / dataPlaneProcessors.size
-    })
+    }
+  })
+  if (config.requiresZookeeper) {
     
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
+      val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => 
a.processors(0))
       val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
         metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
       }
@@ -156,17 +138,21 @@ class SocketServer(val config: KafkaConfig,
         Option(metrics.metric(metricName)).fold(0.0)(m => 
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
       }.getOrElse(Double.NaN)
     })
-    newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
-    newGauge("MemoryPoolUsed", () => memoryPool.size() - 
memoryPool.availableMemory)
-    
newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () 
=> SocketServer.this.synchronized {
-      val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { 
p =>
-        metrics.metricName("expired-connections-killed-count", MetricsGroup, 
p.metricTags)
-      }
-      expiredConnectionsKilledCountMetricNames.map { metricName =>
-        Option(metrics.metric(metricName)).fold(0.0)(m => 
m.metricValue.asInstanceOf[Double])
-      }.sum
-    })
+  }
+  newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
+  newGauge("MemoryPoolUsed", () => memoryPool.size() - 
memoryPool.availableMemory)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", 
() => SocketServer.this.synchronized {
+    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
+    val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p 
=>
+      metrics.metricName("expired-connections-killed-count", MetricsGroup, 
p.metricTags)
+    }
+    expiredConnectionsKilledCountMetricNames.map { metricName =>
+      Option(metrics.metric(metricName)).fold(0.0)(m => 
m.metricValue.asInstanceOf[Double])
+    }.sum
+  })
+  if (config.requiresZookeeper) {
     
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", 
() => SocketServer.this.synchronized {
+      val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => 
a.processors(0))
       val expiredConnectionsKilledCountMetricNames = 
controlPlaneProcessorOpt.map { p =>
         metrics.metricName("expired-connections-killed-count", MetricsGroup, 
p.metricTags)
       }
@@ -176,112 +162,86 @@ class SocketServer(val config: KafkaConfig,
     })
   }
 
-  /**
-   * Start processing requests and new connections. This method is used for 
delayed starting of
-   * all the acceptors and processors if 
[[kafka.network.SocketServer#startup]] was invoked with
-   * `startProcessingRequests=false`.
-   *
-   * Before starting processors for each endpoint, we ensure that authorizer 
has all the metadata
-   * to authorize requests on that endpoint by waiting on the provided future. 
We start inter-broker
-   * listener before other listeners. This allows authorization metadata for 
other listeners to be
-   * stored in Kafka topics in this cluster.
-   *
-   * @param authorizerFutures Future per [[EndPoint]] used to wait before 
starting the processor
-   *                          corresponding to the [[EndPoint]]
-   */
-  def startProcessingRequests(authorizerFutures: Map[Endpoint, 
CompletableFuture[Void]] = Map.empty): Unit = {
-    info("Starting socket server acceptors and processors")
-    this.synchronized {
-      if (!startedProcessingRequests) {
-        startControlPlaneProcessorAndAcceptor(authorizerFutures)
-        startDataPlaneProcessorsAndAcceptors(authorizerFutures)
-        startedProcessingRequests = true
-      } else {
-        info("Socket server acceptors and processors already started")
-      }
-    }
-    info("Started socket server acceptors and processors")
+  // Create acceptors and processors for the statically configured endpoints 
when the
+  // SocketServer is constructed. Note that this just opens the ports and 
creates the data
+  // structures. It does not start the acceptors and processors or their 
associated JVM
+  // threads.
+  if (apiVersionManager.listenerType.equals(ListenerType.CONTROLLER)) {
+    config.controllerListeners.foreach(createDataPlaneAcceptorAndProcessors)
+  } else {
+    config.controlPlaneListener.foreach(createControlPlaneAcceptorAndProcessor)
+    config.dataPlaneListeners.foreach(createDataPlaneAcceptorAndProcessors)
   }
 
-  /**
-   * Starts processors of the provided acceptor and the acceptor itself.
-   *
-   * Before starting them, we ensure that authorizer has all the metadata to 
authorize
-   * requests on that endpoint by waiting on the provided future.
-   */
-  private def startAcceptorAndProcessors(acceptor: Acceptor,
-                                         authorizerFutures: Map[Endpoint, 
CompletableFuture[Void]] = Map.empty): Unit = {
-    val endpoint = acceptor.endPoint
-    debug(s"Wait for authorizer to complete start up on listener 
${endpoint.listenerName}")
-    waitForAuthorizerFuture(acceptor, authorizerFutures)
-    debug(s"Start processors on listener ${endpoint.listenerName}")
-    acceptor.startProcessors()
-    debug(s"Start acceptor thread on listener ${endpoint.listenerName}")
-    if (!acceptor.isStarted()) {
-      KafkaThread.nonDaemon(
-        
s"${acceptor.threadPrefix()}-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}",
-        acceptor
-      ).start()
-      acceptor.awaitStartup()
-    }
-    info(s"Started ${acceptor.threadPrefix()} acceptor and processor(s) for 
endpoint : ${endpoint.listenerName}")
+  // Processors are now created by each Acceptor. However to preserve 
compatibility, we need to number the processors
+  // globally, so we keep the nextProcessorId counter in SocketServer
+  def nextProcessorId(): Int = {
+    nextProcessorId.getAndIncrement()
   }
 
   /**
-   * Starts processors of all the data-plane acceptors and all the acceptors 
of this server.
+   * This method enables request processing for all endpoints managed by this 
SocketServer. Each
+   * endpoint will be brought up asynchronously as soon as its associated 
future is completed.
+   * Therefore, we do not know that any particular request processor will be 
running by the end of
+   * this function -- just that it might be running.
    *
-   * We start inter-broker listener before other listeners. This allows 
authorization metadata for
-   * other listeners to be stored in Kafka topics in this cluster.
+   * @param authorizerFutures     Future per [[EndPoint]] used to wait before 
starting the
+   *                              processor corresponding to the [[EndPoint]]. 
Any endpoint
+   *                              that does not appear in this map will be 
started once all
+   *                              authorizerFutures are complete.
    */
-  private def startDataPlaneProcessorsAndAcceptors(authorizerFutures: 
Map[Endpoint, CompletableFuture[Void]]): Unit = {
-    val interBrokerListener = dataPlaneAcceptors.asScala.keySet
-      .find(_.listenerName == config.interBrokerListenerName)
-    val orderedAcceptors = interBrokerListener match {
-      case Some(interBrokerListener) => 
List(dataPlaneAcceptors.get(interBrokerListener)) ++
-        dataPlaneAcceptors.asScala.filter { case (k, _) => k != 
interBrokerListener }.values
-      case None => dataPlaneAcceptors.asScala.values
-    }
-    orderedAcceptors.foreach { acceptor =>
-      startAcceptorAndProcessors(acceptor, authorizerFutures)
+  def enableRequestProcessing(
+    authorizerFutures: Map[Endpoint, CompletableFuture[Void]]
+  ): Unit = this.synchronized {
+    if (stopped) {
+      throw new RuntimeException("Can't enable request processing: 
SocketServer is stopped.")
+    }
+
+    def chainAcceptorFuture(acceptor: Acceptor): Unit = {
+      // Because of ephemeral ports, we need to match acceptors to futures by 
looking at
+      // the listener name, rather than the endpoint object.
+      authorizerFutures.find {
+        case (endpoint, _) => 
acceptor.endPoint.listenerName.value().equals(endpoint.listenerName().get())
+      } match {
+        case None => chainFuture(allAuthorizerFuturesComplete, 
acceptor.startFuture)
+        case Some((_, future)) => chainFuture(future, acceptor.startFuture)
+      }
     }
-  }
 
-  /**
-   * Start the processor of control-plane acceptor and the acceptor of this 
server.
-   */
-  private def startControlPlaneProcessorAndAcceptor(authorizerFutures: 
Map[Endpoint, CompletableFuture[Void]]): Unit = {
-    controlPlaneAcceptorOpt.foreach { controlPlaneAcceptor =>
-      startAcceptorAndProcessors(controlPlaneAcceptor, authorizerFutures)
-    }
+    info("Enabling request processing.")
+    controlPlaneAcceptorOpt.foreach(chainAcceptorFuture)
+    dataPlaneAcceptors.values().forEach(chainAcceptorFuture)
+    chainFuture(CompletableFuture.allOf(authorizerFutures.values.toArray: _*),
+        allAuthorizerFuturesComplete)
   }
 
-  private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
-
-  def createDataPlaneAcceptorsAndProcessors(endpoints: Seq[EndPoint]): Unit = {
-    endpoints.foreach { endpoint =>
-      val parsedConfigs = 
config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
-      connectionQuotas.addListener(config, endpoint.listenerName)
-
-      val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty && 
config.interBrokerListenerName == endpoint.listenerName
-
-      val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, 
isPrivilegedListener, dataPlaneRequestChannel)
-      config.addReconfigurable(dataPlaneAcceptor)
-      dataPlaneAcceptor.configure(parsedConfigs)
-      dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
-      info(s"Created data-plane acceptor and processors for endpoint : 
${endpoint.listenerName}")
+  def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit = 
synchronized {
+    if (stopped) {
+      throw new RuntimeException("Can't create new data plane acceptor and 
processors: SocketServer is stopped.")
     }
+    val parsedConfigs = 
config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
+    connectionQuotas.addListener(config, endpoint.listenerName)
+    val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty &&
+      config.interBrokerListenerName == endpoint.listenerName
+    val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, 
isPrivilegedListener, dataPlaneRequestChannel)
+    config.addReconfigurable(dataPlaneAcceptor)
+    dataPlaneAcceptor.configure(parsedConfigs)
+    dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
+    info(s"Created data-plane acceptor and processors for endpoint : 
${endpoint.listenerName}")
   }
 
-  private def createControlPlaneAcceptorAndProcessor(endpointOpt: 
Option[EndPoint]): Unit = {
-    endpointOpt.foreach { endpoint =>
-      connectionQuotas.addListener(config, endpoint.listenerName)
-      val controlPlaneAcceptor = createControlPlaneAcceptor(endpoint, 
controlPlaneRequestChannelOpt.get)
-      controlPlaneAcceptor.addProcessors(1)
-      controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
-      info(s"Created control-plane acceptor and processor for endpoint : 
${endpoint.listenerName}")
+  private def createControlPlaneAcceptorAndProcessor(endpoint: EndPoint): Unit 
= synchronized {
+    if (stopped) {
+      throw new RuntimeException("Can't create new control plane acceptor and 
processor: SocketServer is stopped.")
     }
+    connectionQuotas.addListener(config, endpoint.listenerName)
+    val controlPlaneAcceptor = createControlPlaneAcceptor(endpoint, 
controlPlaneRequestChannelOpt.get)
+    controlPlaneAcceptor.addProcessors(1)
+    controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
+    info(s"Created control-plane acceptor and processor for endpoint : 
${endpoint.listenerName}")
   }
 
+  private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
 
   protected def createDataPlaneAcceptor(endPoint: EndPoint, 
isPrivilegedListener: Boolean, requestChannel: RequestChannel): 
DataPlaneAcceptor = {
     new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, 
time, isPrivilegedListener, requestChannel, metrics, credentialProvider, 
logContext, memoryPool, apiVersionManager)
@@ -294,18 +254,18 @@ class SocketServer(val config: KafkaConfig,
   /**
    * Stop processing requests and new connections.
    */
-  def stopProcessingRequests(): Unit = {
-    info("Stopping socket server request processors")
-    this.synchronized {
-      dataPlaneAcceptors.asScala.values.foreach(_.initiateShutdown())
-      dataPlaneAcceptors.asScala.values.foreach(_.awaitShutdown())
-      controlPlaneAcceptorOpt.foreach(_.initiateShutdown())
-      controlPlaneAcceptorOpt.foreach(_.awaitShutdown())
+  def stopProcessingRequests(): Unit = synchronized {
+    if (!stopped) {
+      stopped = true
+      info("Stopping socket server request processors")
+      dataPlaneAcceptors.asScala.values.foreach(_.beginShutdown())
+      controlPlaneAcceptorOpt.foreach(_.beginShutdown())
+      dataPlaneAcceptors.asScala.values.foreach(_.close())
+      controlPlaneAcceptorOpt.foreach(_.close())
       dataPlaneRequestChannel.clear()
       controlPlaneRequestChannelOpt.foreach(_.clear())
-      stoppedProcessingRequests = true
+      info("Stopped socket server request processors")
     }
-    info("Stopped socket server request processors")
   }
 
   /**
@@ -314,9 +274,10 @@ class SocketServer(val config: KafkaConfig,
    */
   def shutdown(): Unit = {
     info("Shutting down socket server")
+    allAuthorizerFuturesComplete.completeExceptionally(new 
TimeoutException("The socket " +
+      "server was shut down before the Authorizer could be completely 
initialized."))
     this.synchronized {
-      if (!stoppedProcessingRequests)
-        stopProcessingRequests()
+      stopProcessingRequests()
       dataPlaneRequestChannel.shutdown()
       controlPlaneRequestChannelOpt.foreach(_.shutdown())
       connectionQuotas.close()
@@ -338,12 +299,20 @@ class SocketServer(val config: KafkaConfig,
     }
   }
 
+  /**
+   * This method is called to dynamically add listeners.
+   */
   def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
+    if (stopped) {
+      throw new RuntimeException("can't add new listeners: SocketServer is 
stopped.")
+    }
     info(s"Adding data-plane listeners for endpoints $listenersAdded")
-    createDataPlaneAcceptorsAndProcessors(listenersAdded)
     listenersAdded.foreach { endpoint =>
+      createDataPlaneAcceptorAndProcessors(endpoint)
       val acceptor = dataPlaneAcceptors.get(endpoint)
-      startAcceptorAndProcessors(acceptor)
+      // There is no authorizer future for this new listener endpoint. So 
start the
+      // listener once all authorizer futures are complete.
+      chainFuture(allAuthorizerFuturesComplete, acceptor.startFuture)
     }
   }
 
@@ -352,8 +321,8 @@ class SocketServer(val config: KafkaConfig,
     listenersRemoved.foreach { endpoint =>
       connectionQuotas.removeListener(config, endpoint.listenerName)
       dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
-        acceptor.initiateShutdown()
-        acceptor.awaitShutdown()
+        acceptor.beginShutdown()
+        acceptor.close()
       }
     }
   }
@@ -387,15 +356,6 @@ class SocketServer(val config: KafkaConfig,
     }
   }
 
-  private def waitForAuthorizerFuture(acceptor: Acceptor,
-                                      authorizerFutures: Map[Endpoint, 
CompletableFuture[Void]]): Unit = {
-    //we can't rely on authorizerFutures.get() due to ephemeral ports. Get the 
future using listener name
-    authorizerFutures.forKeyValue { (endpoint, future) =>
-      if (endpoint.listenerName == 
Optional.of(acceptor.endPoint.listenerName.value))
-        future.join()
-    }
-  }
-
   // For test usage
   private[network] def connectionCount(address: InetAddress): Int =
     Option(connectionQuotas).fold(0)(_.get(address))
@@ -420,80 +380,22 @@ object SocketServer {
     KafkaConfig.MaxConnectionCreationRateProp)
 
   val ListenerReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsProp, 
KafkaConfig.MaxConnectionCreationRateProp)
-}
 
-/**
- * A base class with some helper variables and methods
- */
-private[kafka] abstract class AbstractServerThread(connectionQuotas: 
ConnectionQuotas) extends Runnable with Logging {
-
-  private val startupLatch = new CountDownLatch(1)
-
-  // `shutdown()` is invoked before `startupComplete` and `shutdownComplete` 
if an exception is thrown in the constructor
-  // (e.g. if the address is already in use). We want `shutdown` to proceed in 
such cases, so we first assign an open
-  // latch and then replace it in `startupComplete()`.
-  @volatile private var shutdownLatch = new CountDownLatch(0)
-
-  private val alive = new AtomicBoolean(true)
-
-  def wakeup(): Unit
-
-  /**
-   * Initiates a graceful shutdown by signaling to stop
-   */
-  def initiateShutdown(): Unit = {
-    if (alive.getAndSet(false))
-      wakeup()
+  def closeSocket(
+    channel: SocketChannel,
+    logging: Logging
+  ): Unit = {
+    CoreUtils.swallow(channel.socket().close(), logging, Level.ERROR)
+    CoreUtils.swallow(channel.close(), logging, Level.ERROR)
   }
 
-  /**
-   * Wait for the thread to completely shutdown
-   */
-  def awaitShutdown(): Unit = shutdownLatch.await
-
-  /**
-   * Returns true if the thread is completely started
-   */
-  def isStarted(): Boolean = startupLatch.getCount == 0
-
-  /**
-   * Wait for the thread to completely start up
-   */
-  def awaitStartup(): Unit = startupLatch.await
-
-  /**
-   * Record that the thread startup is complete
-   */
-  protected def startupComplete(): Unit = {
-    // Replace the open latch with a closed one
-    shutdownLatch = new CountDownLatch(1)
-    startupLatch.countDown()
-  }
-
-  /**
-   * Record that the thread shutdown is complete
-   */
-  protected def shutdownComplete(): Unit = shutdownLatch.countDown()
-
-  /**
-   * Is the server still running?
-   */
-  protected def isRunning: Boolean = alive.get
-
-  /**
-   * Close `channel` and decrement the connection count.
-   */
-  def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
-    if (channel != null) {
-      debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress}")
-      connectionQuotas.dec(listenerName, channel.socket.getInetAddress)
-      closeSocket(channel)
-    }
-  }
-
-  protected def closeSocket(channel: SocketChannel): Unit = {
-    CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
-    CoreUtils.swallow(channel.close(), this, Level.ERROR)
+  def chainFuture(sourceFuture: CompletableFuture[Void],
+                  destinationFuture: CompletableFuture[Void]): Unit = {
+    sourceFuture.whenComplete((_, t) => if (t != null) {
+      destinationFuture.completeExceptionally(t)
+    } else {
+      destinationFuture.complete(null)
+    })
   }
 }
 
@@ -650,7 +552,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
                                        val endPoint: EndPoint,
                                        var config: KafkaConfig,
                                        nodeId: Int,
-                                       connectionQuotas: ConnectionQuotas,
+                                       val connectionQuotas: ConnectionQuotas,
                                        time: Time,
                                        isPrivilegedListener: Boolean,
                                        requestChannel: RequestChannel,
@@ -659,7 +561,9 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
                                        logContext: LogContext,
                                        memoryPool: MemoryPool,
                                        apiVersionManager: ApiVersionManager)
-  extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+  extends Runnable with Logging with KafkaMetricsGroup {
+
+  val shouldRun = new AtomicBoolean(true)
 
   def metricPrefix(): String
   def threadPrefix(): String
@@ -671,7 +575,6 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private val nioSelector = NSelector.open()
   private[network] val serverChannel = openServerSocket(endPoint.host, 
endPoint.port, listenBacklogSize)
   private[network] val processors = new ArrayBuffer[Processor]()
-  private val processorsStarted = new AtomicBoolean
   // Build the metric name explicitly in order to keep the existing name for 
compatibility
   private val blockedPercentMeterMetricName = explicitMetricName(
     "kafka.network",
@@ -681,24 +584,27 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private val blockedPercentMeter = 
newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
   private var currentProcessorIndex = 0
   private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()
+  private var started = false
+  private[network] val startFuture = new CompletableFuture[Void]()
 
-  private[network] case class DelayedCloseSocket(socket: SocketChannel, 
endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] {
-    override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs 
compare that.endThrottleTimeMs
-  }
+  val thread = KafkaThread.nonDaemon(
+    
s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
+    this)
 
-  private[network] def startProcessors(): Unit = synchronized {
-    if (!processorsStarted.getAndSet(true)) {
-      startProcessors(processors)
+  startFuture.thenRun(() => synchronized {
+    if (!shouldRun.get()) {
+      debug(s"Ignoring start future for ${endPoint.listenerName} since the 
acceptor has already been shut down.")
+    } else {
+      debug(s"Starting processors for listener ${endPoint.listenerName}")
+      started = true
+      processors.foreach(_.start())
+      debug(s"Starting acceptor thread for listener ${endPoint.listenerName}")
+      thread.start()
     }
-  }
+  })
 
-  private def startProcessors(processors: Seq[Processor]): Unit = synchronized 
{
-    processors.foreach { processor =>
-      KafkaThread.nonDaemon(
-        
s"${threadPrefix()}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
-        processor
-      ).start()
-    }
+  private[network] case class DelayedCloseSocket(socket: SocketChannel, 
endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] {
+    override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs 
compare that.endThrottleTimeMs
   }
 
   private[network] def removeProcessors(removeCount: Int): Unit = synchronized 
{
@@ -707,33 +613,34 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
     // The processors are then removed from `requestChannel` and any pending 
responses to these processors are dropped.
     val toRemove = processors.takeRight(removeCount)
     processors.remove(processors.size - removeCount, removeCount)
-    toRemove.foreach(_.initiateShutdown())
-    toRemove.foreach(_.awaitShutdown())
+    toRemove.foreach(_.close())
     toRemove.foreach(processor => requestChannel.removeProcessor(processor.id))
   }
 
-  override def initiateShutdown(): Unit = {
-    super.initiateShutdown()
-    synchronized {
-      processors.foreach(_.initiateShutdown())
+  def beginShutdown(): Unit = {
+    if (shouldRun.getAndSet(false)) {
+      wakeup()
+      synchronized {
+        processors.foreach(_.beginShutdown())
+      }
     }
   }
 
-  override def awaitShutdown(): Unit = {
-    super.awaitShutdown()
+  def close(): Unit = {
+    beginShutdown()
+    thread.join()
     synchronized {
-      processors.foreach(_.awaitShutdown())
+      processors.foreach(_.close())
     }
   }
 
   /**
    * Accept loop that checks for new connection attempts
    */
-  def run(): Unit = {
+  override def run(): Unit = {
     serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
-    startupComplete()
     try {
-      while (isRunning) {
+      while (shouldRun.get()) {
         try {
           acceptNewConnections()
           closeThrottledConnections()
@@ -750,9 +657,8 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
       debug("Closing server socket, selector, and any throttled sockets.")
       CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
       CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
-      throttledSockets.foreach(throttledSocket => 
closeSocket(throttledSocket.socket))
+      throttledSockets.foreach(throttledSocket => 
closeSocket(throttledSocket.socket, this))
       throttledSockets.clear()
-      shutdownComplete()
     }
   }
 
@@ -788,7 +694,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
     if (ready > 0) {
       val keys = nioSelector.selectedKeys()
       val iter = keys.iterator()
-      while (iter.hasNext && isRunning) {
+      while (iter.hasNext && shouldRun.get()) {
         try {
           val key = iter.next
           iter.remove()
@@ -833,7 +739,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
     } catch {
       case e: TooManyConnectionsException =>
         info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
-        close(endPoint.listenerName, socketChannel)
+        connectionQuotas.closeChannel(this, endPoint.listenerName, 
socketChannel)
         None
       case e: ConnectionThrottledException =>
         val ip = socketChannel.socket.getInetAddress
@@ -843,7 +749,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
         None
       case e: IOException =>
         error(s"Encountered an error while configuring the connection, closing 
it.", e)
-        close(endPoint.listenerName, socketChannel)
+        connectionQuotas.closeChannel(this, endPoint.listenerName, 
socketChannel)
         None
     }
   }
@@ -864,7 +770,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
     while (throttledSockets.headOption.exists(_.endThrottleTimeMs < timeMs)) {
       val closingSocket = throttledSockets.dequeue()
       debug(s"Closing socket from ip ${closingSocket.socket.getRemoteAddress}")
-      closeSocket(closingSocket.socket)
+      closeSocket(closingSocket.socket, this)
     }
   }
 
@@ -882,10 +788,9 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   /**
    * Wakeup the thread for selection.
    */
-  @Override
   def wakeup(): Unit = nioSelector.wakeup()
 
-  def addProcessors(toCreate: Int): Unit = {
+  def addProcessors(toCreate: Int): Unit = synchronized {
     val listenerName = endPoint.listenerName
     val securityProtocol = endPoint.securityProtocol
     val listenerProcessors = new ArrayBuffer[Processor]()
@@ -894,14 +799,16 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
       val processor = newProcessor(socketServer.nextProcessorId(), 
listenerName, securityProtocol)
       listenerProcessors += processor
       requestChannel.addProcessor(processor)
-    }
 
+      if (started) {
+        processor.start()
+      }
+    }
     processors ++= listenerProcessors
-    if (processorsStarted.get)
-      startProcessors(listenerProcessors)
   }
 
   def newProcessor(id: Int, listenerName: ListenerName, securityProtocol: 
SecurityProtocol): Processor = {
+    val name = 
s"${threadPrefix()}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-${id}"
     new Processor(id,
                   time,
                   config.socketRequestMaxBytes,
@@ -918,9 +825,9 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
                   logContext,
                   Processor.ConnectionQueueSize,
                   isPrivilegedListener,
-                  apiVersionManager)
+                  apiVersionManager,
+                  name)
   }
-
 }
 
 private[kafka] object Processor {
@@ -940,23 +847,29 @@ private[kafka] object Processor {
  *                             forwarding requests; if the control plane is 
not defined, the processor
  *                             relying on the inter broker listener would be 
acting as the privileged listener.
  */
-private[kafka] class Processor(val id: Int,
-                               time: Time,
-                               maxRequestSize: Int,
-                               requestChannel: RequestChannel,
-                               connectionQuotas: ConnectionQuotas,
-                               connectionsMaxIdleMs: Long,
-                               failedAuthenticationDelayMs: Int,
-                               listenerName: ListenerName,
-                               securityProtocol: SecurityProtocol,
-                               config: KafkaConfig,
-                               metrics: Metrics,
-                               credentialProvider: CredentialProvider,
-                               memoryPool: MemoryPool,
-                               logContext: LogContext,
-                               connectionQueueSize: Int,
-                               isPrivilegedListener: Boolean,
-                               apiVersionManager: ApiVersionManager) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+private[kafka] class Processor(
+  val id: Int,
+  time: Time,
+  maxRequestSize: Int,
+  requestChannel: RequestChannel,
+  connectionQuotas: ConnectionQuotas,
+  connectionsMaxIdleMs: Long,
+  failedAuthenticationDelayMs: Int,
+  listenerName: ListenerName,
+  securityProtocol: SecurityProtocol,
+  config: KafkaConfig,
+  metrics: Metrics,
+  credentialProvider: CredentialProvider,
+  memoryPool: MemoryPool,
+  logContext: LogContext,
+  connectionQueueSize: Int,
+  isPrivilegedListener: Boolean,
+  apiVersionManager: ApiVersionManager,
+  threadName: String
+) extends Runnable with KafkaMetricsGroup {
+  val shouldRun = new AtomicBoolean(true)
+
+  val thread = KafkaThread.nonDaemon(threadName, this)
 
   private object ConnectionId {
     def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@@ -1036,9 +949,8 @@ private[kafka] class Processor(val id: Int,
   private var nextConnectionIndex = 0
 
   override def run(): Unit = {
-    startupComplete()
     try {
-      while (isRunning) {
+      while (shouldRun.get()) {
         try {
           // setup any new connections that have been queued up
           configureNewConnections()
@@ -1062,7 +974,6 @@ private[kafka] class Processor(val id: Int,
     } finally {
       debug(s"Closing selector - processor $id")
       CoreUtils.swallow(closeAll(), this, Level.ERROR)
-      shutdownComplete()
     }
   }
 
@@ -1325,7 +1236,7 @@ private[kafka] class Processor(val id: Int,
         case e: Throwable =>
           val remoteAddress = channel.socket.getRemoteSocketAddress
           // need to close the channel here to avoid a socket leak.
-          close(listenerName, channel)
+          connectionQuotas.closeChannel(this, listenerName, channel)
           processException(s"Processor $id closed connection from 
$remoteAddress", e)
       }
     }
@@ -1392,15 +1303,27 @@ private[kafka] class Processor(val id: Int,
   private[network] def channel(connectionId: String): Option[KafkaChannel] =
     Option(selector.channel(connectionId))
 
+  def start(): Unit = thread.start()
+
   /**
    * Wakeup the thread for selection.
    */
-  override def wakeup(): Unit = selector.wakeup()
+  def wakeup(): Unit = selector.wakeup()
 
-  override def initiateShutdown(): Unit = {
-    super.initiateShutdown()
-    removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
-    metrics.removeMetric(expiredConnectionsKilledCountMetricName)
+  def beginShutdown(): Unit = {
+    if (shouldRun.getAndSet(false)) {
+      wakeup()
+    }
+  }
+
+  def close(): Unit = {
+    try {
+      beginShutdown()
+      thread.join()
+    } finally {
+      removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
+      metrics.removeMetric(expiredConnectionsKilledCountMetricName)
+    }
   }
 }
 
@@ -1864,6 +1787,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
       sensor
     }
   }
+
+  /**
+   * Close `channel` and decrement the connection count.
+   */
+  def closeChannel(log: Logging, listenerName: ListenerName, channel: 
SocketChannel): Unit = {
+    if (channel != null) {
+      log.debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress}")
+      dec(listenerName, channel.socket.getInetAddress)
+      closeSocket(channel, log)
+    }
+  }
+
 }
 
 class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends 
KafkaException(s"Too many connections from $ip (maximum = $count)")
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala 
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 3ca2704eb3c..39dff71ad11 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -97,6 +97,11 @@ class BrokerLifecycleManager(val config: KafkaConfig,
    */
   val initialCatchUpFuture = new CompletableFuture[Void]()
 
+  /**
+   * A future which is completed when the broker is unfenced for the first 
time.
+   */
+  val initialUnfenceFuture = new CompletableFuture[Void]()
+
   /**
    * A future which is completed when controlled shutdown is done.
    */
@@ -189,8 +194,9 @@ class BrokerLifecycleManager(val config: KafkaConfig,
       channelManager, clusterId, advertisedListeners, supportedFeatures))
   }
 
-  def setReadyToUnfence(): Unit = {
+  def setReadyToUnfence(): CompletableFuture[Void] = {
     eventQueue.append(new SetReadyToUnfenceEvent())
+    initialUnfenceFuture
   }
 
   def brokerEpoch: Long = _brokerEpoch
@@ -384,6 +390,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
             case BrokerState.RECOVERY =>
               if (!message.data().isFenced) {
                 info(s"The broker has been unfenced. Transitioning from 
RECOVERY to RUNNING.")
+                initialUnfenceFuture.complete(null)
                 _state = BrokerState.RUNNING
               } else {
                 info(s"The broker is in RECOVERY.")
@@ -476,6 +483,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
       _state = BrokerState.SHUTTING_DOWN
       controlledShutdownFuture.complete(null)
       initialCatchUpFuture.cancel(false)
+      initialUnfenceFuture.cancel(false)
       if (_channelManager != null) {
         _channelManager.shutdown()
         _channelManager = null
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 5447e298636..514b94bab15 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -43,6 +43,7 @@ import 
org.apache.kafka.common.security.scram.internals.ScramMechanism
 import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
 import org.apache.kafka.common.{ClusterResource, Endpoint}
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.metadata.{BrokerState, VersionRange}
 import org.apache.kafka.raft.RaftConfig.AddressSpec
 import org.apache.kafka.raft.{RaftClient, RaftConfig}
@@ -183,7 +184,9 @@ class BrokerServer(
 
       config.dynamicConfig.initialize(zkClientOpt = None)
 
-      lifecycleManager = new BrokerLifecycleManager(config, time, 
threadNamePrefix)
+      lifecycleManager = new BrokerLifecycleManager(config,
+        time,
+        threadNamePrefix)
 
       /* start scheduler */
       kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
@@ -237,7 +240,6 @@ class BrokerServer(
       // Delay starting processors until the end of the initialization 
sequence to ensure
       // that credentials have been loaded before processing authentications.
       socketServer = new SocketServer(config, metrics, time, 
credentialProvider, apiVersionManager)
-      socketServer.startup(startProcessingRequests = false)
 
       clientQuotaMetadataManager = new 
ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
 
@@ -365,10 +367,13 @@ class BrokerServer(
           endpoints.asScala.map(ep => 
ep.listenerName().orElse("(none)")).mkString(", "))
       }
       val authorizerInfo = ServerInfo(new ClusterResource(clusterId),
-        config.nodeId, endpoints, interBrokerListener)
+        config.nodeId,
+        endpoints,
+        interBrokerListener,
+        config.earlyStartListeners.map(_.value()).asJava)
 
-      /* Get the authorizer and initialize it if one is specified.*/
-      authorizer = config.authorizer
+      // Create and initialize an authorizer if one is configured.
+      authorizer = config.createNewAuthorizer()
       authorizer.foreach(_.configure(config.originals))
       val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = 
authorizer match {
         case Some(authZ) =>
@@ -432,17 +437,36 @@ class BrokerServer(
       // publish operation to complete. This first operation will initialize 
logManager,
       // replicaManager, groupCoordinator, and txnCoordinator. The log manager 
may perform
       // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
-      metadataListener.startPublishing(metadataPublisher).get()
+      try {
+        metadataListener.startPublishing(metadataPublisher).get()
+      } catch {
+        case t: Throwable => throw new RuntimeException("Received a fatal 
error while " +
+          "waiting for the broker to catch up with the current cluster 
metadata.", t)
+      }
 
       // Log static broker configurations.
       new KafkaConfig(config.originals(), true)
 
-      // Enable inbound TCP connections.
-      socketServer.startProcessingRequests(authorizerFutures)
+      // Enable inbound TCP connections. Each endpoint will be started only 
once its matching
+      // authorizer future is completed.
+      socketServer.enableRequestProcessing(authorizerFutures)
+
+      // If we are using a ClusterMetadataAuthorizer which stores its ACLs in 
the metadata log,
+      // notify it that the loading process is complete.
+      authorizer match {
+        case Some(clusterMetadataAuthorizer: ClusterMetadataAuthorizer) =>
+          clusterMetadataAuthorizer.completeInitialLoad()
+        case _ => // nothing to do
+      }
 
       // We're now ready to unfence the broker. This also allows this broker 
to transition
       // from RECOVERY state to RUNNING state, once the controller unfences 
the broker.
-      lifecycleManager.setReadyToUnfence()
+      try {
+        lifecycleManager.setReadyToUnfence().get()
+      } catch {
+        case t: Throwable => throw new RuntimeException("Received a fatal 
error while " +
+          "waiting for the broker to be unfenced.", t)
+      }
 
       maybeChangeStatus(STARTING, STARTED)
     } catch {
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index ba38aa17853..e004996bf74 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -103,7 +103,6 @@ class ControllerServer(
       info("Starting controller")
 
       maybeChangeStatus(STARTING, STARTED)
-      // TODO: initialize the log dir(s)
       this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] 
").logPrefix()
 
       newGauge("ClusterId", () => clusterId)
@@ -116,7 +115,7 @@ class ControllerServer(
       }
 
       val javaListeners = config.controllerListeners.map(_.toJava).asJava
-      authorizer = config.authorizer
+      authorizer = config.createNewAuthorizer()
       authorizer.foreach(_.configure(config.originals))
 
       val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = 
authorizer match {
@@ -125,7 +124,11 @@ class ControllerServer(
           // AuthorizerServerInfo, such as the assumption that there is an 
inter-broker
           // listener, or that ID is named brokerId.
           val controllerAuthorizerInfo = ServerInfo(
-            new ClusterResource(clusterId), config.nodeId, javaListeners, 
javaListeners.get(0))
+            new ClusterResource(clusterId),
+            config.nodeId,
+            javaListeners,
+            javaListeners.get(0),
+            config.earlyStartListeners.map(_.value()).asJava)
           authZ.start(controllerAuthorizerInfo).asScala.map { case (ep, cs) =>
             ep -> cs.toCompletableFuture
           }.toMap
@@ -144,7 +147,6 @@ class ControllerServer(
         time,
         credentialProvider,
         apiVersionManager)
-      socketServer.startup(startProcessingRequests = false, 
controlPlaneListener = None, config.controllerListeners)
 
       if (config.controllerListeners.nonEmpty) {
         socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
@@ -213,7 +215,17 @@ class ControllerServer(
         config.numIoThreads,
         s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
         DataPlaneAcceptor.ThreadPrefix)
-      socketServer.startProcessingRequests(authorizerFutures)
+
+      /**
+       * Enable the controller endpoint(s). If we are using an authorizer 
which stores
+       * ACLs in the metadata log, such as StandardAuthorizer, we will be able 
to start
+       * accepting requests from principals included super.users right after 
this point,
+       * but we will not be able to process requests from non-superusers until 
the
+       * QuorumController declares that we have caught up to the high water 
mark of the
+       * metadata log. See 
@link{QuorumController#maybeCompleteAuthorizerInitialLoad}
+       * and KIP-801 for details.
+       */
+      socketServer.enableRequestProcessing(authorizerFutures)
     } catch {
       case e: Throwable =>
         maybeChangeStatus(STARTING, STARTED)
@@ -241,6 +253,7 @@ class ControllerServer(
         CoreUtils.swallow(quotaManagers.shutdown(), this)
       if (controller != null)
         controller.close()
+      CoreUtils.swallow(authorizer.foreach(_.close()), this)
       createTopicPolicy.foreach(policy => CoreUtils.swallow(policy.close(), 
this))
       alterConfigPolicy.foreach(policy => CoreUtils.swallow(policy.close(), 
this))
       socketServerFirstBoundPortFuture.completeExceptionally(new 
RuntimeException("shutting down"))
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7d0d3a3d852..5942b164ea4 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -405,6 +405,8 @@ object KafkaConfig {
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameProp = "authorizer.class.name"
+  val EarlyStartListenersProp = "early.start.listeners"
+
   /** ********* Socket Server Configuration ***********/
   val ListenersProp = "listeners"
   val AdvertisedListenersProp = "advertised.listeners"
@@ -725,7 +727,12 @@ object KafkaConfig {
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameDoc = s"The fully qualified name of a class that 
implements <code>${classOf[Authorizer].getName}</code>" +
-  " interface, which is used by the broker for authorization."
+    " interface, which is used by the broker for authorization."
+  val EarlyStartListenersDoc = "A comma-separated list of listener names which 
may be started before the authorizer has finished " +
+   "initialization. This is useful when the authorizer is dependent on the 
cluster itself for bootstrapping, as is the case for " +
+   "the StandardAuthorizer (which stores ACLs in the metadata log.) By 
default, all listeners included in controller.listener.names " +
+   "will also be early start listeners. A listener should not appear in this 
list if it accepts external traffic."
+
   /** ********* Socket Server Configuration ***********/
   val ListenersDoc = "Listener List - Comma-separated list of URIs we will 
listen on and the listener names." +
     s" If the listener name is not a security protocol, 
<code>$ListenerSecurityProtocolMapProp</code> must also be set.\n" +
@@ -1142,6 +1149,7 @@ object KafkaConfig {
 
       /************* Authorizer Configuration ***********/
       .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, 
new ConfigDef.NonNullValidator(), LOW, AuthorizerClassNameDoc)
+      .define(EarlyStartListenersProp, STRING, null,  HIGH, 
EarlyStartListenersDoc)
 
       /** ********* Socket Server Configuration ***********/
       .define(ListenersProp, STRING, Defaults.Listeners, HIGH, ListenersDoc)
@@ -1653,7 +1661,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val metadataSnapshotMaxNewRecordBytes = 
getLong(KafkaConfig.MetadataSnapshotMaxNewRecordBytesProp)
 
   /************* Authorizer Configuration ***********/
-  val authorizer: Option[Authorizer] = {
+  def createNewAuthorizer(): Option[Authorizer] = {
     val className = getString(KafkaConfig.AuthorizerClassNameProp)
     if (className == null || className.isEmpty)
       None
@@ -1662,6 +1670,23 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     }
   }
 
+  val earlyStartListeners: Set[ListenerName] = {
+    val listenersSet = listeners.map(_.listenerName).toSet
+    val controllerListenersSet = controllerListeners.map(_.listenerName).toSet
+    Option(getString(KafkaConfig.EarlyStartListenersProp)) match {
+      case None => controllerListenersSet
+      case Some(str) =>
+        str.split(",").map(_.trim()).filter(!_.isEmpty).map { str =>
+          val listenerName = new ListenerName(str)
+          if (!listenersSet.contains(listenerName) && 
!controllerListenersSet.contains(listenerName))
+            throw new ConfigException(s"${KafkaConfig.EarlyStartListenersProp} 
contains " +
+              s"listener ${listenerName.value()}, but this is not contained in 
" +
+              s"${KafkaConfig.ListenersProp} or 
${KafkaConfig.ControllerListenerNamesProp}")
+          listenerName
+        }.toSet
+    }
+  }
+
   /** ********* Socket Server Configuration ***********/
   val socketSendBufferBytes = getInt(KafkaConfig.SocketSendBufferBytesProp)
   val socketReceiveBufferBytes = 
getInt(KafkaConfig.SocketReceiveBufferBytesProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index c69f43c9eff..b1273ed628f 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -307,7 +307,6 @@ class KafkaServer(
         // Note that we allow the use of KRaft mode controller APIs when 
forwarding is enabled
         // so that the Envelope request is exposed. This is only used in 
testing currently.
         socketServer = new SocketServer(config, metrics, time, 
credentialProvider, apiVersionManager)
-        socketServer.startup(startProcessingRequests = false)
 
         // Start alter partition manager based on the IBP version
         alterIsrManager = if 
(config.interBrokerProtocolVersion.isAlterPartitionSupported) {
@@ -381,7 +380,7 @@ class KafkaServer(
         )
 
         /* Get the authorizer and initialize it if one is specified.*/
-        authorizer = config.authorizer
+        authorizer = config.createNewAuthorizer()
         authorizer.foreach(_.configure(config.originals))
         val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = 
authorizer match {
           case Some(authZ) =>
@@ -450,7 +449,7 @@ class KafkaServer(
         dynamicConfigManager = new ZkConfigManager(zkClient, 
dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
-        socketServer.startProcessingRequests(authorizerFutures)
+        socketServer.enableRequestProcessing(authorizerFutures)
 
         _brokerState = BrokerState.RUNNING
         shutdownLatch = new CountDownLatch(1)
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala 
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 0b27f7fcb52..56104df8212 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -19,6 +19,7 @@ package kafka.tools
 
 import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 import java.util.concurrent.{CompletableFuture, CountDownLatch, 
LinkedBlockingDeque, TimeUnit}
+
 import joptsimple.OptionException
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.{KafkaRaftManager, RaftManager}
@@ -74,7 +75,6 @@ class TestRaftServer(
 
     val apiVersionManager = new 
SimpleApiVersionManager(ListenerType.CONTROLLER)
     socketServer = new SocketServer(config, metrics, time, credentialProvider, 
apiVersionManager)
-    socketServer.startup(startProcessingRequests = false)
 
     val metaProperties = MetaProperties(
       clusterId = Uuid.ZERO_UUID.toString,
@@ -119,7 +119,7 @@ class TestRaftServer(
 
     workloadGenerator.start()
     raftManager.startup()
-    socketServer.startProcessingRequests(Map.empty)
+    socketServer.enableRequestProcessing(Map.empty)
   }
 
   def shutdown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 9b4e81ab39e..98f92d61ff2 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -79,7 +79,7 @@ class SocketServerTest {
 
   private val apiVersionManager = new 
SimpleApiVersionManager(ListenerType.ZK_BROKER)
   val server = new SocketServer(config, metrics, Time.SYSTEM, 
credentialProvider, apiVersionManager)
-  server.startup()
+  server.enableRequestProcessing(Map.empty)
   val sockets = new ArrayBuffer[Socket]
 
   private val kafkaLogger = org.apache.log4j.LogManager.getLogger("kafka")
@@ -296,20 +296,18 @@ class SocketServerTest {
     shutdownServerAndMetrics(server)
     val testProps = new Properties
     testProps ++= props
-    testProps.put("listeners", 
"EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROLLER://localhost:0")
-    testProps.put("listener.security.protocol.map", 
"EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")
-    testProps.put("control.plane.listener.name", "CONTROLLER")
+    testProps.put("listeners", 
"EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROL_PLANE://localhost:0")
+    testProps.put("listener.security.protocol.map", 
"EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROL_PLANE:PLAINTEXT")
+    testProps.put("control.plane.listener.name", "CONTROL_PLANE")
     testProps.put("inter.broker.listener.name", "INTERNAL")
     val config = KafkaConfig.fromProps(testProps)
     val testableServer = new TestableSocketServer(config)
-    testableServer.startup(startProcessingRequests = false)
 
     val updatedEndPoints = config.effectiveAdvertisedListeners.map { endpoint 
=>
       endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
     }.map(_.toJava)
 
     val externalReadyFuture = new CompletableFuture[Void]()
-    val executor = Executors.newSingleThreadExecutor()
 
     def controlPlaneListenerStarted() = {
       try {
@@ -334,18 +332,19 @@ class SocketServerTest {
     try {
       val externalListener = new ListenerName("EXTERNAL")
       val externalEndpoint = updatedEndPoints.find(e => e.listenerName.get == 
externalListener.value).get
-      val futures = Map(externalEndpoint -> externalReadyFuture)
-      val startFuture = executor.submit((() => 
testableServer.startProcessingRequests(futures)): Runnable)
+      val controlPlaneListener = new ListenerName("CONTROL_PLANE")
+      val controlPlaneEndpoint = updatedEndPoints.find(e => e.listenerName.get 
== controlPlaneListener.value).get
+      val futures = Map(
+        externalEndpoint -> externalReadyFuture,
+        controlPlaneEndpoint -> CompletableFuture.completedFuture[Void](null))
+      testableServer.enableRequestProcessing(futures)
       TestUtils.waitUntilTrue(() => controlPlaneListenerStarted(), "Control 
plane listener not started")
-      TestUtils.waitUntilTrue(() => 
listenerStarted(config.interBrokerListenerName), "Inter-broker listener not 
started")
-      assertFalse(startFuture.isDone, "Socket server startup did not wait for 
future to complete")
-
+      assertFalse(listenerStarted(config.interBrokerListenerName))
       assertFalse(listenerStarted(externalListener))
-
       externalReadyFuture.complete(null)
+      TestUtils.waitUntilTrue(() => 
listenerStarted(config.interBrokerListenerName), "Inter-broker listener not 
started")
       TestUtils.waitUntilTrue(() => listenerStarted(externalListener), 
"External listener not started")
     } finally {
-      executor.shutdownNow()
       shutdownServerAndMetrics(testableServer)
     }
   }
@@ -362,7 +361,6 @@ class SocketServerTest {
     val config = KafkaConfig.fromProps(testProps)
     val connectionQueueSize = 1
     val testableServer = new TestableSocketServer(config, connectionQueueSize)
-    testableServer.startup(startProcessingRequests = false)
 
     val socket1 = connect(testableServer, new ListenerName("EXTERNAL"), 
localAddr = InetAddress.getLocalHost)
     sendRequest(socket1, producerRequestBytes())
@@ -468,7 +466,7 @@ class SocketServerTest {
       time, credentialProvider, apiVersionManager)
 
     try {
-      overrideServer.startup()
+      overrideServer.enableRequestProcessing(Map.empty)
       val serializedBytes = producerRequestBytes()
 
       // Connection with no outstanding requests
@@ -536,7 +534,7 @@ class SocketServerTest {
     }
 
     try {
-      overrideServer.startup()
+      overrideServer.enableRequestProcessing(Map.empty)
       overrideServer.testableProcessor.setConnectionId(overrideConnectionId)
       val socket1 = connectAndWaitForConnectionRegister()
       TestUtils.waitUntilTrue(() => connectionCount == 1 && 
openChannel.isDefined, "Failed to create channel")
@@ -805,7 +803,7 @@ class SocketServerTest {
     val server = new SocketServer(KafkaConfig.fromProps(newProps), new 
Metrics(),
       Time.SYSTEM, credentialProvider, apiVersionManager)
     try {
-      server.startup()
+      server.enableRequestProcessing(Map.empty)
       // make the maximum allowable number of connections
       val conns = (0 until 5).map(_ => connect(server))
       // now try one more (should fail)
@@ -844,7 +842,7 @@ class SocketServerTest {
     val overrideServer = new 
SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics,
       Time.SYSTEM, credentialProvider, apiVersionManager)
     try {
-      overrideServer.startup()
+      overrideServer.enableRequestProcessing(Map.empty)
       // make the maximum allowable number of connections
       val conns = (0 until overrideNum).map(_ => connect(overrideServer))
 
@@ -884,7 +882,7 @@ class SocketServerTest {
     }
 
     try {
-      overrideServer.startup()
+      overrideServer.enableRequestProcessing(Map.empty)
       val conn = connect(overrideServer)
       conn.setSoTimeout(3000)
       assertEquals(-1, conn.getInputStream.read())
@@ -907,7 +905,7 @@ class SocketServerTest {
     // update the connection rate to 5
     overrideServer.connectionQuotas.updateIpConnectionRateQuota(None, 
Some(connectionRate))
     try {
-      overrideServer.startup()
+      overrideServer.enableRequestProcessing(Map.empty)
       // make the (maximum allowable number + 1) of connections
       (0 to connectionRate).map(_ => connect(overrideServer))
 
@@ -956,7 +954,7 @@ class SocketServerTest {
     val overrideServer = new 
SocketServer(KafkaConfig.fromProps(overrideProps), new Metrics(),
       time, credentialProvider, apiVersionManager)
     overrideServer.connectionQuotas.updateIpConnectionRateQuota(None, 
Some(connectionRate))
-    overrideServer.startup()
+    overrideServer.enableRequestProcessing(Map.empty)
     // make the maximum allowable number of connections
     (0 until connectionRate).map(_ => connect(overrideServer))
     // now try one more (should get throttled)
@@ -979,7 +977,7 @@ class SocketServerTest {
     val overrideServer = new 
SocketServer(KafkaConfig.fromProps(sslServerProps), serverMetrics,
       Time.SYSTEM, credentialProvider, apiVersionManager)
     try {
-      overrideServer.startup()
+      overrideServer.enableRequestProcessing(Map.empty)
       val sslContext = 
SSLContext.getInstance(TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS)
       sslContext.init(null, Array(TestUtils.trustAllCerts), new 
java.security.SecureRandom())
       val socketFactory = sslContext.getSocketFactory
@@ -1038,7 +1036,7 @@ class SocketServerTest {
     val time = new MockTime()
     val overrideServer = new 
TestableSocketServer(KafkaConfig.fromProps(overrideProps), time = time)
     try {
-      overrideServer.startup()
+      overrideServer.enableRequestProcessing(Map.empty)
       val socket = connect(overrideServer, 
ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT))
 
       val correlationId = -1
@@ -1118,7 +1116,7 @@ class SocketServerTest {
     val overrideServer = new TestableSocketServer(KafkaConfig.fromProps(props))
 
     try {
-      overrideServer.startup()
+      overrideServer.enableRequestProcessing(Map.empty)
       val conn: Socket = connect(overrideServer)
       overrideServer.testableProcessor.closeSocketOnSendResponse(conn)
       val serializedBytes = producerRequestBytes()
@@ -1150,7 +1148,7 @@ class SocketServerTest {
     val overrideServer = new TestableSocketServer(KafkaConfig.fromProps(props))
 
     try {
-      overrideServer.startup()
+      overrideServer.enableRequestProcessing(Map.empty)
       val selector = overrideServer.testableSelector
 
       // Create a channel, send some requests and close socket. Receive one 
pending request after socket was closed.
@@ -1178,7 +1176,7 @@ class SocketServerTest {
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), 
serverMetrics,
       Time.SYSTEM, credentialProvider, apiVersionManager)
     try {
-      overrideServer.startup()
+      overrideServer.enableRequestProcessing(Map.empty)
       conn = connect(overrideServer)
       val serializedBytes = producerRequestBytes()
       sendRequest(conn, serializedBytes)
@@ -1559,7 +1557,7 @@ class SocketServerTest {
     props.put(KafkaConfig.ConnectionsMaxIdleMsProp, idleTimeMs.toString)
     props ++= sslServerProps
     val testableServer = new TestableSocketServer(time = time)
-    testableServer.startup()
+    testableServer.enableRequestProcessing(Map.empty)
 
     assertTrue(testableServer.controlPlaneRequestChannelOpt.isEmpty)
 
@@ -1595,7 +1593,7 @@ class SocketServerTest {
     val time = new MockTime()
     props ++= sslServerProps
     val testableServer = new TestableSocketServer(time = time)
-    testableServer.startup()
+    testableServer.enableRequestProcessing(Map.empty)
     val proxyServer = new ProxyServer(testableServer)
     try {
       val testableSelector = testableServer.testableSelector
@@ -1741,7 +1739,7 @@ class SocketServerTest {
     val numConnections = 5
     props.put("max.connections.per.ip", numConnections.toString)
     val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(props), connectionQueueSize = 1)
-    testableServer.startup()
+    testableServer.enableRequestProcessing(Map.empty)
     val testableSelector = testableServer.testableSelector
     val errors = new mutable.HashSet[String]
 
@@ -1893,7 +1891,9 @@ class SocketServerTest {
                                  startProcessingRequests: Boolean = true): 
Unit = {
     shutdownServerAndMetrics(server)
     val testableServer = new TestableSocketServer(config)
-    testableServer.startup(startProcessingRequests = startProcessingRequests)
+    if (startProcessingRequests) {
+      testableServer.enableRequestProcessing(Map.empty)
+    }
     try {
       testWithServer(testableServer)
     } finally {
@@ -1998,7 +1998,8 @@ class SocketServerTest {
                     new LogContext(),
                     connectionQueueSize,
                     isPrivilegedListener,
-                    apiVersionManager) {
+                    apiVersionManager,
+                    s"TestableProcessor${id}") {
     private var connectionId: Option[String] = None
     private var conn: Option[Socket] = None
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e5ff0cd1080..e879a7f6ffc 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1486,13 +1486,19 @@ class KafkaConfigTest {
     assertEquals("3", originals.get(KafkaConfig.NodeIdProp))
   }
 
-  @Test
-  def testBrokerIdIsInferredByNodeIdWithKraft(): Unit = {
+  def kraftProps(): Properties = {
     val props = new Properties()
     props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
     props.setProperty(KafkaConfig.NodeIdProp, "3")
     props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9093")
+    props
+  }
+
+  @Test
+  def testBrokerIdIsInferredByNodeIdWithKraft(): Unit = {
+    val props = new Properties(kraftProps())
+    props.putAll(kraftProps())
     val config = KafkaConfig.fromProps(props)
     assertEquals(3, config.brokerId)
     assertEquals(3, config.nodeId)
@@ -1527,4 +1533,40 @@ class KafkaConfigTest {
     
assertTrue(ce.getMessage.contains(KafkaConfig.InterBrokerSecurityProtocolProp))
   }
 
+  @Test
+  def testEarlyStartListenersDefault(): Unit = {
+    val props = new Properties()
+    props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER://:8092")
+    props.setProperty(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9093")
+    val config = new KafkaConfig(props)
+    assertEquals(Set("CONTROLLER"), config.earlyStartListeners.map(_.value()))
+  }
+
+  @Test
+  def testEarlyStartListeners(): Unit = {
+    val props = new Properties()
+    props.putAll(kraftProps())
+    props.setProperty(KafkaConfig.EarlyStartListenersProp, 
"INTERNAL,INTERNAL2")
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
+      "INTERNAL:PLAINTEXT,INTERNAL2:PLAINTEXT,CONTROLLER:PLAINTEXT")
+    props.setProperty(KafkaConfig.ListenersProp,
+      "INTERNAL://127.0.0.1:9092,INTERNAL2://127.0.0.1:9093")
+    val config = new KafkaConfig(props)
+    assertEquals(Set(new ListenerName("INTERNAL"), new 
ListenerName("INTERNAL2")),
+      config.earlyStartListeners)
+  }
+
+  @Test
+  def testEarlyStartListenersMustBeListeners(): Unit = {
+    val props = new Properties()
+    props.putAll(kraftProps())
+    props.setProperty(KafkaConfig.EarlyStartListenersProp, "INTERNAL")
+    assertEquals("early.start.listeners contains listener INTERNAL, but this 
is not " +
+      "contained in listeners or controller.listener.names",
+        assertThrows(classOf[ConfigException], () => new 
KafkaConfig(props)).getMessage)
+  }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 9ff6a599577..cea62c8f00b 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -167,6 +167,10 @@ public final class QuorumController implements Controller {
             this.clusterId = clusterId;
         }
 
+        public int nodeId() {
+            return nodeId;
+        }
+
         public Builder setTime(Time time) {
             this.time = time;
             return this;
@@ -753,11 +757,11 @@ public final class QuorumController implements Controller 
{
     }
 
     class QuorumMetaLogListener implements 
RaftClient.Listener<ApiMessageAndVersion> {
-
         @Override
         public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
             appendRaftEvent("handleCommit[baseOffset=" + reader.baseOffset() + 
"]", () -> {
                 try {
+                    maybeCompleteAuthorizerInitialLoad();
                     boolean isActiveController = curClaimEpoch != -1;
                     long processedRecordsSize = 0;
                     while (reader.hasNext()) {
@@ -921,12 +925,35 @@ public final class QuorumController implements Controller 
{
                 if (this != metaLogListener) {
                     log.debug("Ignoring {} raft event from an old 
registration", name);
                 } else {
-                    runnable.run();
+                    try {
+                        runnable.run();
+                    } finally {
+                        maybeCompleteAuthorizerInitialLoad();
+                    }
                 }
             });
         }
     }
 
+    private void maybeCompleteAuthorizerInitialLoad() {
+        if (!needToCompleteAuthorizerLoad) return;
+        OptionalLong highWatermark = raftClient.highWatermark();
+        if (highWatermark.isPresent()) {
+            if (lastCommittedOffset + 1 >= highWatermark.getAsLong()) {
+                log.info("maybeCompleteAuthorizerInitialLoad: completing 
authorizer " +
+                    "initial load at last committed offset {}.", 
lastCommittedOffset);
+                authorizer.get().completeInitialLoad();
+                needToCompleteAuthorizerLoad = false;
+            } else {
+                log.trace("maybeCompleteAuthorizerInitialLoad: can't proceed 
because " +
+                    "lastCommittedOffset  = {}, but highWatermark = {}.",
+                    lastCommittedOffset, highWatermark.getAsLong());
+            }
+        } else {
+            log.trace("maybeCompleteAuthorizerInitialLoad: highWatermark not 
set.");
+        }
+    }
+
     private void renounce() {
         curClaimEpoch = -1;
         controllerMetrics.setActive(false);
@@ -1276,6 +1303,12 @@ public final class QuorumController implements 
Controller {
      */
     private long lastCommittedTimestamp = -1;
 
+    /**
+     * True if we need to complete the authorizer initial load.
+     * This must be accessed only by the event queue thread.
+     */
+    private boolean needToCompleteAuthorizerLoad;
+
     /**
      * If we have called scheduleWrite, this is the last offset we got back 
from it.
      */
@@ -1384,9 +1417,12 @@ public final class QuorumController implements 
Controller {
         this.metaLogListener = new QuorumMetaLogListener();
         this.curClaimEpoch = -1;
         this.writeOffset = -1L;
+        this.needToCompleteAuthorizerLoad = authorizer.isPresent();
 
         resetState();
 
+        log.info("Creating new QuorumController with clusterId {}, authorizer 
{}.", clusterId, authorizer);
+
         this.raftClient.register(metaLogListener);
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
index cae35283335..d7673022897 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
@@ -56,6 +56,18 @@ public interface ClusterMetadataAuthorizer extends 
Authorizer {
      */
     AclMutator aclMutatorOrException();
 
+    /**
+     * Complete the initial load of the cluster metadata authorizer, so that 
all
+     * principals can use it.
+     */
+    void completeInitialLoad();
+
+    /**
+     * Complete the initial load of the cluster metadata authorizer with an 
exception,
+     * indicating that the loading process has failed.
+     */
+    void completeInitialLoad(Exception e);
+
     /**
      * Load the ACLs in the given map. Anything not in the map will be removed.
      * The authorizer will also wait for this initial snapshot load to 
complete when
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
index 510a6f87075..611625a4f3a 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
@@ -54,10 +54,9 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer {
     public final static String ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG = 
"allow.everyone.if.no.acl.found";
 
     /**
-     * A future which is completed once we have loaded a snapshot.
-     * TODO: KAFKA-13649: StandardAuthorizer should not finish loading until 
it reads up to the high water mark.
+     * A future which is completed once we have loaded up to the initial high 
water mark.
      */
-    private final CompletableFuture<Void> initialLoadFuture = 
CompletableFuture.completedFuture(null);
+    private final CompletableFuture<Void> initialLoadFuture = new 
CompletableFuture<>();
 
     /**
      * The current data. Can be read without a lock. Must be written while 
holding the object lock.
@@ -78,6 +77,24 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer {
         return aclMutator;
     }
 
+    @Override
+    public synchronized void completeInitialLoad() {
+        data = data.copyWithNewLoadingComplete(true);
+        data.log.info("Completed initial ACL load process.");
+        initialLoadFuture.complete(null);
+    }
+
+    // Visible for testing
+    public CompletableFuture<Void> initialLoadFuture() {
+        return initialLoadFuture;
+    }
+
+    @Override
+    public void completeInitialLoad(Exception e) {
+        data.log.error("Failed to complete initial ACL load process.", e);
+        initialLoadFuture.completeExceptionally(e);
+    }
+
     @Override
     public void addAcl(Uuid id, StandardAcl acl) {
         data.addAcl(id, acl);
@@ -98,7 +115,12 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer {
             AuthorizerServerInfo serverInfo) {
         Map<Endpoint, CompletableFuture<Void>> result = new HashMap<>();
         for (Endpoint endpoint : serverInfo.endpoints()) {
-            result.put(endpoint, initialLoadFuture);
+            if (serverInfo.earlyStartListeners().contains(
+                    endpoint.listenerName().orElseGet(() -> ""))) {
+                result.put(endpoint, CompletableFuture.completedFuture(null));
+            } else {
+                result.put(endpoint, initialLoadFuture);
+            }
         }
         return result;
     }
@@ -131,7 +153,6 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer {
         // Complete the initialLoadFuture, if it hasn't been completed already.
         initialLoadFuture.completeExceptionally(new TimeoutException("The 
authorizer was " +
             "closed before the initial load could complete."));
-        // Nothing else to do here.
     }
 
     @Override
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index 27cca4271b8..b46c839e4fa 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.AuthorizerNotReadyException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
@@ -92,6 +93,11 @@ public class StandardAuthorizerData {
      */
     final AclMutator aclMutator;
 
+    /**
+     * True if the authorizer loading process is complete.
+     */
+    final boolean loadingComplete;
+
     /**
      * A statically configured set of users that are authorized to do anything.
      */
@@ -123,6 +129,7 @@ public class StandardAuthorizerData {
     static StandardAuthorizerData createEmpty() {
         return new StandardAuthorizerData(createLogger(-1),
             null,
+            false,
             Collections.emptySet(),
             DENIED,
             new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
@@ -130,6 +137,7 @@ public class StandardAuthorizerData {
 
     private StandardAuthorizerData(Logger log,
                                    AclMutator aclMutator,
+                                   boolean loadingComplete,
                                    Set<String> superUsers,
                                    AuthorizationResult defaultResult,
                                    ConcurrentSkipListSet<StandardAcl> 
aclsByResource,
@@ -137,6 +145,7 @@ public class StandardAuthorizerData {
         this.log = log;
         this.auditLog = auditLogger();
         this.aclMutator = aclMutator;
+        this.loadingComplete = loadingComplete;
         this.superUsers = superUsers;
         this.defaultRule = new DefaultRule(defaultResult);
         this.aclsByResource = aclsByResource;
@@ -147,6 +156,17 @@ public class StandardAuthorizerData {
         return new StandardAuthorizerData(
             log,
             newAclMutator,
+            loadingComplete,
+            superUsers,
+            defaultRule.result,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewLoadingComplete(boolean 
newLoadingComplete) {
+        return new StandardAuthorizerData(log,
+            aclMutator,
+            newLoadingComplete,
             superUsers,
             defaultRule.result,
             aclsByResource,
@@ -159,6 +179,7 @@ public class StandardAuthorizerData {
         return new StandardAuthorizerData(
             createLogger(nodeId),
             aclMutator,
+            loadingComplete,
             newSuperUsers,
             newDefaultResult,
             aclsByResource,
@@ -169,6 +190,7 @@ public class StandardAuthorizerData {
         StandardAuthorizerData newData = new StandardAuthorizerData(
             log,
             aclMutator,
+            loadingComplete,
             superUsers,
             defaultRule.result,
             new ConcurrentSkipListSet<>(),
@@ -250,6 +272,8 @@ public class StandardAuthorizerData {
         // Superusers are authorized to do anything.
         if (superUsers.contains(principal.toString())) {
             rule = SuperUserRule.INSTANCE;
+        } else if (!loadingComplete) {
+            throw new AuthorizerNotReadyException();
         } else {
             MatchingAclRule aclRule = findAclRule(
                 matchingPrincipals(requestContext),
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
index 08b362a7276..fdc03276451 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
@@ -148,6 +148,16 @@ public class AclControlManagerTest {
             throw new NotControllerException("The current node is not the 
active controller.");
         }
 
+        @Override
+        public void completeInitialLoad() {
+            // do nothing
+        }
+
+        @Override
+        public void completeInitialLoad(Exception e) {
+            // do nothing
+        }
+
         @Override
         public void loadSnapshot(Map<Uuid, StandardAcl> acls) {
             this.acls = new HashMap<>(acls);
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 9c679ad7d1e..148c5720b6d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.controller;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -81,6 +82,7 @@ import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.MetadataRecordSerde;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -91,6 +93,7 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import static java.util.function.Function.identity;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
 import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
 import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
@@ -1071,4 +1074,50 @@ public class QuorumControllerTest {
             }
         }
     }
+
+    private static final Uuid FOO_ID = 
Uuid.fromString("igRktLOnR8ektWHr79F8mw");
+
+    private static final Map<Integer, Long> ALL_ZERO_BROKER_EPOCHS =
+        IntStream.of(0, 1, 2, 3).boxed().collect(Collectors.toMap(identity(), 
__ -> 0L));
+
+    @Test
+    public void testQuorumControllerCompletesAuthorizerInitialLoad() throws 
Throwable {
+        final int numControllers = 3;
+        List<StandardAuthorizer> authorizers = new ArrayList<>(numControllers);
+        for (int i = 0; i < numControllers; i++) {
+            StandardAuthorizer authorizer = new StandardAuthorizer();
+            authorizer.configure(Collections.emptyMap());
+            authorizers.add(authorizer);
+        }
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(
+            numControllers,
+            Optional.empty(),
+            shared -> {
+                shared.setInitialMaxReadOffset(2);
+            }
+        )) {
+            logEnv.appendInitialRecords(expectedSnapshotContent(FOO_ID, 
ALL_ZERO_BROKER_EPOCHS));
+            logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2));
+            try (QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv(logEnv, b -> {
+                b.setAuthorizer(authorizers.get(b.nodeId()));
+            })) {
+                assertInitialLoadFuturesNotComplete(authorizers);
+                logEnv.logManagers().get(0).setMaxReadOffset(Long.MAX_VALUE);
+                QuorumController active = controlEnv.activeController();
+                active.unregisterBroker(ANONYMOUS_CONTEXT, 3).get();
+                
assertInitialLoadFuturesNotComplete(authorizers.stream().skip(1).collect(Collectors.toList()));
+                logEnv.logManagers().forEach(m -> 
m.setMaxReadOffset(Long.MAX_VALUE));
+                TestUtils.waitForCondition(() -> {
+                    return authorizers.stream().allMatch(a -> 
a.initialLoadFuture().isDone());
+                }, "Failed to complete initial authorizer load for all 
controllers.");
+            }
+        }
+    }
+
+    private static void 
assertInitialLoadFuturesNotComplete(List<StandardAuthorizer> authorizers) {
+        for (int i = 0; i < authorizers.size(); i++) {
+            assertFalse(authorizers.get(i).initialLoadFuture().isDone(),
+                "authorizer " + i + " should not have completed loading.");
+        }
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizerTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizerTest.java
index 19b07abed2c..3242170bcdf 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizerTest.java
@@ -106,6 +106,16 @@ public class ClusterMetadataAuthorizerTest {
             return aclMutator;
         }
 
+        @Override
+        public void completeInitialLoad() {
+            // do nothing
+        }
+
+        @Override
+        public void completeInitialLoad(Exception e) {
+            // do nothing
+        }
+
         @Override
         public void loadSnapshot(Map<Uuid, StandardAcl> acls) {
             // do nothing
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index 7ed37785c42..987c00155c4 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -17,19 +17,25 @@
 
 package org.apache.kafka.metadata.authorizer;
 
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.AuthorizerNotReadyException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.server.authorizer.Action;
 import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -40,12 +46,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.CompletionStage;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
@@ -82,6 +93,56 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(value = 40)
 public class StandardAuthorizerTest {
+    public static final Endpoint PLAINTEXT = new Endpoint("PLAINTEXT",
+        SecurityProtocol.PLAINTEXT,
+        "127.0.0.1",
+        9020);
+
+    public static final Endpoint CONTROLLER = new Endpoint("CONTROLLER",
+        SecurityProtocol.PLAINTEXT,
+        "127.0.0.1",
+        9020);
+
+    static class AuthorizerTestServerInfo implements AuthorizerServerInfo {
+        private final Collection<Endpoint> endpoints;
+
+        AuthorizerTestServerInfo(Collection<Endpoint> endpoints) {
+            assertFalse(endpoints.isEmpty());
+            this.endpoints = endpoints;
+        }
+
+        @Override
+        public ClusterResource clusterResource() {
+            return new 
ClusterResource(Uuid.fromString("r7mqHQrxTNmzbKvCvWZzLQ").toString());
+        }
+
+        @Override
+        public int brokerId() {
+            return 0;
+        }
+
+        @Override
+        public Collection<Endpoint> endpoints() {
+            return endpoints;
+        }
+
+        @Override
+        public Endpoint interBrokerEndpoint() {
+            return endpoints.iterator().next();
+        }
+
+        @Override
+        public Collection<String> earlyStartListeners() {
+            List<String> result = new ArrayList<>();
+            for (Endpoint endpoint : endpoints) {
+                if (endpoint.listenerName().get().equals("CONTROLLER")) {
+                    result.add(endpoint.listenerName().get());
+                }
+            }
+            return result;
+        }
+    }
+
     @Test
     public void testGetConfiguredSuperUsers() {
         assertEquals(Collections.emptySet(),
@@ -124,6 +185,16 @@ public class StandardAuthorizerTest {
             new ResourcePattern(resourceType, resourceName, LITERAL), 1, 
false, false);
     }
 
+    static StandardAuthorizer createAndInitializeStandardAuthorizer() {
+        StandardAuthorizer authorizer = new StandardAuthorizer();
+        authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG, 
"User:superman"));
+        authorizer.start(new 
AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
+        authorizer.completeInitialLoad();
+        return authorizer;
+    }
+
+    private final static AtomicLong NEXT_ID = new AtomicLong(0);
+
     static StandardAcl newFooAcl(AclOperation op, AclPermissionType 
permission) {
         return new StandardAcl(
             TOPIC,
@@ -225,8 +296,7 @@ public class StandardAuthorizerTest {
 
     @Test
     public void testListAcls() throws Exception {
-        StandardAuthorizer authorizer = new StandardAuthorizer();
-        authorizer.configure(Collections.emptyMap());
+        StandardAuthorizer authorizer = 
createAndInitializeStandardAuthorizer();
         List<StandardAclWithId> fooAcls = asList(
             withId(newFooAcl(READ, ALLOW)),
             withId(newFooAcl(WRITE, ALLOW)));
@@ -247,8 +317,7 @@ public class StandardAuthorizerTest {
 
     @Test
     public void testSimpleAuthorizations() throws Exception {
-        StandardAuthorizer authorizer = new StandardAuthorizer();
-        authorizer.configure(Collections.emptyMap());
+        StandardAuthorizer authorizer = 
createAndInitializeStandardAuthorizer();
         List<StandardAclWithId> fooAcls = asList(
             withId(newFooAcl(READ, ALLOW)),
             withId(newFooAcl(WRITE, ALLOW)));
@@ -269,20 +338,17 @@ public class StandardAuthorizerTest {
 
     @Test
     public void testDenyPrecedenceWithOperationAll() throws Exception {
-        StandardAuthorizer authorizer = new StandardAuthorizer();
-        authorizer.configure(Collections.emptyMap());
+        StandardAuthorizer authorizer = 
createAndInitializeStandardAuthorizer();
         List<StandardAcl> acls = Arrays.asList(
             new StandardAcl(TOPIC, "foo", LITERAL, "User:alice", "*", ALL, 
DENY),
             new StandardAcl(TOPIC, "foo", PREFIXED, "User:alice", "*", READ, 
ALLOW),
             new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, DENY),
             new StandardAcl(TOPIC, "foo", PREFIXED, "User:*", "*", DESCRIBE, 
ALLOW)
         );
-
         acls.forEach(acl -> {
             StandardAclWithId aclWithId = withId(acl);
             authorizer.addAcl(aclWithId.id(), aclWithId.acl());
         });
-
         assertEquals(Arrays.asList(DENIED, DENIED, DENIED, ALLOWED), 
authorizer.authorize(
             newRequestContext("alice"),
             Arrays.asList(
@@ -290,7 +356,6 @@ public class StandardAuthorizerTest {
                 newAction(READ, TOPIC, "foo"),
                 newAction(DESCRIBE, TOPIC, "foo"),
                 newAction(READ, TOPIC, "foobar"))));
-
         assertEquals(Arrays.asList(DENIED, DENIED, DENIED, ALLOWED, DENIED), 
authorizer.authorize(
             newRequestContext("bob"),
             Arrays.asList(
@@ -303,19 +368,16 @@ public class StandardAuthorizerTest {
 
     @Test
     public void testTopicAclWithOperationAll() throws Exception {
-        StandardAuthorizer authorizer = new StandardAuthorizer();
-        authorizer.configure(Collections.emptyMap());
+        StandardAuthorizer authorizer = 
createAndInitializeStandardAuthorizer();
         List<StandardAcl> acls = Arrays.asList(
             new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, ALLOW),
             new StandardAcl(TOPIC, "bar", PREFIXED, "User:alice", "*", ALL, 
ALLOW),
             new StandardAcl(TOPIC, "baz", LITERAL, "User:bob", "*", ALL, ALLOW)
         );
-
         acls.forEach(acl -> {
             StandardAclWithId aclWithId = withId(acl);
             authorizer.addAcl(aclWithId.id(), aclWithId.acl());
         });
-
         assertEquals(Arrays.asList(ALLOWED, ALLOWED, DENIED), 
authorizer.authorize(
             newRequestContext("alice"),
             Arrays.asList(
@@ -349,8 +411,7 @@ public class StandardAuthorizerTest {
         InetAddress host1 = InetAddress.getByName("192.168.1.1");
         InetAddress host2 = InetAddress.getByName("192.168.1.2");
 
-        StandardAuthorizer authorizer = new StandardAuthorizer();
-        authorizer.configure(Collections.emptyMap());
+        StandardAuthorizer authorizer = 
createAndInitializeStandardAuthorizer();
         List<StandardAcl> acls = Arrays.asList(
             new StandardAcl(TOPIC, "foo", LITERAL, "User:alice", 
host1.getHostAddress(), READ, DENY),
             new StandardAcl(TOPIC, "foo", LITERAL, "User:alice", "*", READ, 
ALLOW),
@@ -395,9 +456,7 @@ public class StandardAuthorizerTest {
             .build();
     }
 
-    private static StandardAuthorizer createAuthorizerWithManyAcls() {
-        StandardAuthorizer authorizer = new StandardAuthorizer();
-        authorizer.configure(Collections.emptyMap());
+    private static void addManyAcls(StandardAuthorizer authorizer) {
         List<StandardAcl> acls = Arrays.asList(
             new StandardAcl(TOPIC, "green2", LITERAL, "User:*", "*", READ, 
ALLOW),
             new StandardAcl(TOPIC, "green", PREFIXED, "User:bob", "*", READ, 
ALLOW),
@@ -413,12 +472,12 @@ public class StandardAuthorizerTest {
             StandardAclWithId aclWithId = withId(acl);
             authorizer.addAcl(aclWithId.id(), aclWithId.acl());
         });
-        return authorizer;
     }
 
     @Test
     public void testAuthorizationWithManyAcls() throws Exception {
-        StandardAuthorizer authorizer = createAuthorizerWithManyAcls();
+        StandardAuthorizer authorizer = 
createAndInitializeStandardAuthorizer();
+        addManyAcls(authorizer);
         assertEquals(Arrays.asList(ALLOWED, DENIED),
             authorizer.authorize(new MockAuthorizableRequestContext.Builder().
                     setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
@@ -449,7 +508,8 @@ public class StandardAuthorizerTest {
             Mockito.when(auditLog.isDebugEnabled()).thenReturn(true);
             Mockito.when(auditLog.isTraceEnabled()).thenReturn(true);
 
-            StandardAuthorizer authorizer = createAuthorizerWithManyAcls();
+            StandardAuthorizer authorizer = 
createAndInitializeStandardAuthorizer();
+            addManyAcls(authorizer);
             ResourcePattern topicResource = new ResourcePattern(TOPIC, 
"alpha", LITERAL);
             Action action = new Action(READ, topicResource, 1, false, 
logIfDenied);
             MockAuthorizableRequestContext requestContext = new 
MockAuthorizableRequestContext.Builder()
@@ -490,7 +550,8 @@ public class StandardAuthorizerTest {
             Mockito.when(auditLog.isDebugEnabled()).thenReturn(true);
             Mockito.when(auditLog.isTraceEnabled()).thenReturn(true);
 
-            StandardAuthorizer authorizer = createAuthorizerWithManyAcls();
+            StandardAuthorizer authorizer = 
createAndInitializeStandardAuthorizer();
+            addManyAcls(authorizer);
             ResourcePattern topicResource = new ResourcePattern(TOPIC, 
"green1", LITERAL);
             Action action = new Action(READ, topicResource, 1, logIfAllowed, 
false);
             MockAuthorizableRequestContext requestContext = new 
MockAuthorizableRequestContext.Builder()
@@ -514,4 +575,69 @@ public class StandardAuthorizerTest {
         }
     }
 
+    /**
+     * Test that StandardAuthorizer#start returns a completed future for early 
start
+     * listeners.
+     */
+    @Test
+    public void testStartWithEarlyStartListeners() throws Exception {
+        StandardAuthorizer authorizer = new StandardAuthorizer();
+        authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG, 
"User:superman"));
+        Map<Endpoint, ? extends CompletionStage<Void>> futures2 = authorizer.
+            start(new AuthorizerTestServerInfo(Arrays.asList(PLAINTEXT, 
CONTROLLER)));
+        assertEquals(new HashSet<>(Arrays.asList(PLAINTEXT, CONTROLLER)), 
futures2.keySet());
+        assertFalse(futures2.get(PLAINTEXT).toCompletableFuture().isDone());
+        assertTrue(futures2.get(CONTROLLER).toCompletableFuture().isDone());
+    }
+
+    /**
+     * Test attempts to authorize prior to completeInitialLoad. During this 
time, only
+     * superusers can be authorized. Other users will get an 
AuthorizerNotReadyException
+     * exception. Not even an authorization result, just an exception thrown 
for the whole
+     * batch.
+     */
+    @Test
+    public void testAuthorizationPriorToCompleteInitialLoad() throws Exception 
{
+        StandardAuthorizer authorizer = new StandardAuthorizer();
+        authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG, 
"User:superman"));
+        assertThrows(AuthorizerNotReadyException.class, () ->
+            authorizer.authorize(new MockAuthorizableRequestContext.Builder().
+                    setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
+                Arrays.asList(newAction(READ, TOPIC, "green1"),
+                    newAction(READ, TOPIC, "green2"))));
+        assertEquals(Arrays.asList(ALLOWED, ALLOWED),
+            authorizer.authorize(new MockAuthorizableRequestContext.Builder().
+                    setPrincipal(new KafkaPrincipal(USER_TYPE, 
"superman")).build(),
+                Arrays.asList(newAction(READ, TOPIC, "green1"),
+                    newAction(WRITE, GROUP, "wheel"))));
+    }
+
+    @Test
+    public void testCompleteInitialLoad() throws Exception {
+        StandardAuthorizer authorizer = new StandardAuthorizer();
+        authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG, 
"User:superman"));
+        Map<Endpoint, ? extends CompletionStage<Void>> futures = authorizer.
+            start(new 
AuthorizerTestServerInfo(Collections.singleton(PLAINTEXT)));
+        assertEquals(Collections.singleton(PLAINTEXT), futures.keySet());
+        assertFalse(futures.get(PLAINTEXT).toCompletableFuture().isDone());
+        authorizer.completeInitialLoad();
+        assertTrue(futures.get(PLAINTEXT).toCompletableFuture().isDone());
+        
assertFalse(futures.get(PLAINTEXT).toCompletableFuture().isCompletedExceptionally());
+    }
+
+    @Test
+    public void testCompleteInitialLoadWithException() throws Exception {
+        StandardAuthorizer authorizer = new StandardAuthorizer();
+        authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG, 
"User:superman"));
+        Map<Endpoint, ? extends CompletionStage<Void>> futures = authorizer.
+            start(new AuthorizerTestServerInfo(Arrays.asList(PLAINTEXT, 
CONTROLLER)));
+        assertEquals(new HashSet<>(Arrays.asList(PLAINTEXT, CONTROLLER)), 
futures.keySet());
+        assertFalse(futures.get(PLAINTEXT).toCompletableFuture().isDone());
+        assertTrue(futures.get(CONTROLLER).toCompletableFuture().isDone());
+        authorizer.completeInitialLoad(new TimeoutException("timed out"));
+        assertTrue(futures.get(PLAINTEXT).toCompletableFuture().isDone());
+        
assertTrue(futures.get(PLAINTEXT).toCompletableFuture().isCompletedExceptionally());
+        assertTrue(futures.get(CONTROLLER).toCompletableFuture().isDone());
+        
assertFalse(futures.get(CONTROLLER).toCompletableFuture().isCompletedExceptionally());
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 855fd468cba..3c495adff92 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -77,10 +77,10 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
         int size();
     }
 
-    static class LeaderChangeBatch implements LocalBatch {
+    public static class LeaderChangeBatch implements LocalBatch {
         private final LeaderAndEpoch newLeader;
 
-        LeaderChangeBatch(LeaderAndEpoch newLeader) {
+        public LeaderChangeBatch(LeaderAndEpoch newLeader) {
             this.newLeader = newLeader;
         }
 
@@ -113,12 +113,12 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
         }
     }
 
-    static class LocalRecordBatch implements LocalBatch {
+    public static class LocalRecordBatch implements LocalBatch {
         private final int leaderEpoch;
         private final long appendTimestamp;
         private final List<ApiMessageAndVersion> records;
 
-        LocalRecordBatch(int leaderEpoch, long appendTimestamp, 
List<ApiMessageAndVersion> records) {
+        public LocalRecordBatch(int leaderEpoch, long appendTimestamp, 
List<ApiMessageAndVersion> records) {
             this.leaderEpoch = leaderEpoch;
             this.appendTimestamp = appendTimestamp;
             this.records = records;
@@ -184,6 +184,11 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
          */
         private long prevOffset;
 
+        /**
+         * The initial max read offset which LocalLog instances will be 
configured with.
+         */
+        private long initialMaxReadOffset = Long.MAX_VALUE;
+
         /**
          * Maps committed offset to snapshot reader.
          */
@@ -237,7 +242,12 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
             return offset;
         }
 
-        synchronized long append(LocalBatch batch) {
+        public synchronized long append(LocalBatch batch) {
+            try {
+                throw new RuntimeException("foo");
+            } catch (Exception e) {
+                log.info("WATERMELON: appending {}", batch, e);
+            }
             prevOffset += batch.size();
             log.debug("append(batch={}, prevOffset={})", batch, prevOffset);
             batches.put(prevOffset, batch);
@@ -352,6 +362,15 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
                 })
                 .sum();
         }
+
+        public SharedLogData setInitialMaxReadOffset(long 
initialMaxReadOffset) {
+            this.initialMaxReadOffset = initialMaxReadOffset;
+            return this;
+        }
+
+        public long initialMaxReadOffset() {
+            return initialMaxReadOffset;
+        }
     }
 
     private static class MetaLogListenerData {
@@ -454,6 +473,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
         this.log = logContext.logger(LocalLogManager.class);
         this.nodeId = nodeId;
         this.shared = shared;
+        this.maxReadOffset = shared.initialMaxReadOffset();
         this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, 
threadNamePrefix);
         shared.registerLogManager(this);
     }
@@ -502,6 +522,8 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
                             // Only notify the listener if it equals the 
shared leader state
                             LeaderAndEpoch sharedLeader = 
shared.leaderAndEpoch();
                             if (batch.newLeader.equals(sharedLeader)) {
+                                log.debug("Node {}: Executing 
handleLeaderChange {}",
+                                    nodeId, sharedLeader);
                                 listenerData.handleLeaderChange(entryOffset, 
batch.newLeader);
                                 if (batch.newLeader.epoch() > leader.epoch()) {
                                     leader = batch.newLeader;
@@ -658,6 +680,15 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
         });
     }
 
+    @Override
+    public synchronized OptionalLong highWatermark() {
+        if (shared.prevOffset > 0) {
+            return OptionalLong.of(shared.prevOffset);
+        } else {
+            return OptionalLong.empty();
+        }
+    }
+
     @Override
     public long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
         if (batch.isEmpty()) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
index ed18ab6053d..17c9c467124 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
@@ -20,8 +20,11 @@ package org.apache.kafka.metalog;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metalog.LocalLogManager.LeaderChangeBatch;
+import org.apache.kafka.metalog.LocalLogManager.LocalRecordBatch;
 import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
 import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.snapshot.RawSnapshotReader;
 import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
@@ -32,7 +35,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 public class LocalLogManagerTestEnv implements AutoCloseable {
     private static final Logger log =
@@ -77,10 +82,15 @@ public class LocalLogManagerTestEnv implements 
AutoCloseable {
         return testEnv;
     }
 
-    public LocalLogManagerTestEnv(int numManagers, Optional<RawSnapshotReader> 
snapshot) throws Exception {
+    public LocalLogManagerTestEnv(
+        int numManagers,
+        Optional<RawSnapshotReader> snapshot,
+        Consumer<SharedLogData> dataSetup
+    ) throws Exception {
         clusterId = Uuid.randomUuid().toString();
         dir = TestUtils.tempDirectory();
         shared = new SharedLogData(snapshot);
+        dataSetup.accept(shared);
         List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers);
         try {
             for (int nodeId = 0; nodeId < numManagers; nodeId++) {
@@ -102,6 +112,28 @@ public class LocalLogManagerTestEnv implements 
AutoCloseable {
         this.logManagers = newLogManagers;
     }
 
+    public LocalLogManagerTestEnv(
+        int numManagers,
+        Optional<RawSnapshotReader> snapshot
+    ) throws Exception {
+        this(numManagers, snapshot, __ -> { });
+    }
+
+    /**
+     * Append some records to the log. This method is meant to be called 
before the
+     * controllers are started, to simulate a pre-existing metadata log.
+     *
+     * @param records   The records to be appended. Will be added in a single 
batch.
+     */
+    public void appendInitialRecords(List<ApiMessageAndVersion> records) {
+        int initialLeaderEpoch = 1;
+        shared.append(new LeaderChangeBatch(
+            new LeaderAndEpoch(OptionalInt.empty(), initialLeaderEpoch + 1)));
+        shared.append(new LocalRecordBatch(initialLeaderEpoch + 1, 0, 
records));
+        shared.append(new LeaderChangeBatch(
+            new LeaderAndEpoch(OptionalInt.of(0), initialLeaderEpoch + 2)));
+    }
+
     public String clusterId() {
         return clusterId;
     }
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 8e4f50e7488..952cc60a387 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -24,6 +24,7 @@ import org.apache.kafka.snapshot.SnapshotWriter;
 import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
+import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 
 public interface RaftClient<T> extends AutoCloseable {
@@ -113,6 +114,11 @@ public interface RaftClient<T> extends AutoCloseable {
      */
     void unregister(Listener<T> listener);
 
+    /**
+     * Returns the current high water mark, or OptionalLong.empty if it is not 
known.
+     */
+    OptionalLong highWatermark();
+
     /**
      * Return the current {@link LeaderAndEpoch}.
      *

Reply via email to