This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 390f126972b KAFKA-20083: Move BrokerLifecycleManager to server module 
(#21336)
390f126972b is described below

commit 390f126972bd9d7609a0bbc40c0bc388d0be4b1e
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Jan 27 17:47:23 2026 +0100

    KAFKA-20083: Move BrokerLifecycleManager to server module (#21336)
    
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/server/BrokerLifecycleManager.scala      | 622 ------------------
 .../src/main/scala/kafka/server/BrokerServer.scala |   4 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   4 -
 .../server/FetchFromFollowerIntegrationTest.scala  |   2 +-
 .../kafka/server/BrokerLifecycleManagerTest.scala  |  20 +-
 .../kafka/server/DescribeClusterRequestTest.scala  |   2 +-
 .../kafka/server/BrokerLifecycleManager.java       | 722 +++++++++++++++++++++
 .../kafka/server/config/AbstractKafkaConfig.java   |  16 +
 8 files changed, 753 insertions(+), 639 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala 
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
deleted file mode 100644
index 2368ebc21cc..00000000000
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ /dev/null
@@ -1,622 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.util
-import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS}
-import java.util.concurrent.CompletableFuture
-import kafka.utils.Logging
-import org.apache.kafka.clients.ClientResponse
-import org.apache.kafka.common.Uuid
-import 
org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection
-import org.apache.kafka.common.message.{BrokerHeartbeatRequestData, 
BrokerRegistrationRequestData}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{BrokerHeartbeatRequest, 
BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse}
-import org.apache.kafka.metadata.{BrokerState, VersionRange}
-import org.apache.kafka.queue.EventQueue.DeadlineFunction
-import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
-import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
-
-import java.util.{Comparator, OptionalLong}
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-/**
- * The broker lifecycle manager owns the broker state.
- *
- * Its inputs are messages passed in from other parts of the broker and from 
the
- * controller: requests to start up, or shut down, for example. Its output are 
the broker
- * state and various futures that can be used to wait for broker state 
transitions to
- * occur.
- *
- * The lifecycle manager handles registering the broker with the controller, 
as described
- * in KIP-631. After registration is complete, it handles sending periodic 
broker
- * heartbeats and processing the responses.
- *
- * This code uses an event queue paradigm. Modifications get translated into 
events, which
- * are placed on the queue to be processed sequentially. As described in the 
JavaDoc for
- * each variable, most mutable state can be accessed only from that event 
queue thread.
- * In some cases we expose a volatile variable which can be read from any 
thread, but only
- * written from the event queue thread.
- */
-class BrokerLifecycleManager(
-  val config: KafkaConfig,
-  val time: Time,
-  val threadNamePrefix: String,
-  val logDirs: Set[Uuid],
-  val shutdownHook: () => Unit = () => {}
-) extends Logging {
-
-  private def logPrefix(): String = {
-    val builder = new StringBuilder("[BrokerLifecycleManager")
-    builder.append(" id=").append(config.nodeId)
-    builder.append("] ")
-    builder.toString()
-  }
-
-  val logContext = new LogContext(logPrefix())
-
-  this.logIdent = logContext.logPrefix()
-
-  /**
-   * The broker id.
-   */
-  private val nodeId = config.nodeId
-
-  /**
-   * The broker rack, or null if there is no configured rack.
-   */
-  private val rack = config.rack
-
-  /**
-   * How long to wait for registration to succeed before failing the startup 
process.
-   */
-  private val initialTimeoutNs =
-    MILLISECONDS.toNanos(config.initialRegistrationTimeoutMs.longValue())
-
-  /**
-   * The broker incarnation ID.  This ID uniquely identifies each time we 
start the broker
-   */
-  val incarnationId: Uuid = Uuid.randomUuid()
-
-  /**
-   * A future which is completed just as soon as the broker has caught up with 
the latest
-   * metadata offset for the first time.
-   */
-  val initialCatchUpFuture: CompletableFuture[Void] = new 
CompletableFuture[Void]()
-
-  /**
-   * A future which is completed when the broker is unfenced for the first 
time.
-   */
-  val initialUnfenceFuture: CompletableFuture[Void] = new 
CompletableFuture[Void]()
-
-  /**
-   * A future which is completed when controlled shutdown is done.
-   */
-  val controlledShutdownFuture: CompletableFuture[Void] = new 
CompletableFuture[Void]()
-
-  /**
-   * The broker epoch, or -1 if the broker has not yet registered.  This 
variable can only
-   * be written from the event queue thread.
-   */
-  @volatile private var _brokerEpoch = -1L
-
-  /**
-   * The current broker state.  This variable can only be written from the 
event queue
-   * thread.
-   */
-  @volatile private var _state = BrokerState.NOT_RUNNING
-
-  /**
-   * A thread-safe callback function which gives this manager the current 
highest metadata
-   * offset.  This variable can only be read or written from the event queue 
thread.
-   */
-  private var _highestMetadataOffsetProvider: () => Long = _
-
-  /**
-   * True only if we are ready to unfence the broker.  This variable can only 
be read or
-   * written from the event queue thread.
-   */
-  private var readyToUnfence = false
-
-  /**
-   * Map of accumulated offline directories. The value is true if the 
directory couldn't be communicated
-   * to the Controller.
-   * This variable can only be read or written from the event queue thread.
-   */
-  private var offlineDirs = mutable.Map[Uuid, Boolean]()
-
-  /**
-   * True if we sent an event queue to the active controller requesting 
controlled
-   * shutdown.  This variable can only be read or written from the event queue 
thread.
-   */
-  private var gotControlledShutdownResponse = false
-
-  /**
-   * Whether or not this broker is registered with the controller quorum.
-   * This variable can only be read or written from the event queue thread.
-   */
-  private var registered = false
-
-  /**
-   * True if a request has been sent and a response or timeout has not yet 
been processed.
-   * This variable can only be read or written from the event queue thread.
-   */
-  private var communicationInFlight = false
-
-  /**
-   * True if we should schedule the next communication immediately. This is 
used to delay
-   * an immediate scheduling of a communication event if one is already in 
flight.
-   * This variable can only be read or written from the event queue thread.
-   */
-  private var nextSchedulingShouldBeImmediate = false
-
-  /**
-   * True if the initial registration succeeded.  This variable can only be 
read or
-   * written from the event queue thread.
-   */
-  private var initialRegistrationSucceeded = false
-
-  /**
-   * The cluster ID, or null if this manager has not been started yet.  This 
variable can
-   * only be read or written from the event queue thread.
-   */
-  private var _clusterId: String = _
-
-  /**
-   * The listeners which this broker advertises.  This variable can only be 
read or
-   * written from the event queue thread.
-   */
-  private var _advertisedListeners: ListenerCollection = _
-
-  /**
-   * The features supported by this broker.  This variable can only be read or 
written
-   * from the event queue thread.
-   */
-  private var _supportedFeatures: util.Map[String, VersionRange] = _
-
-  /**
-   * The channel manager, or null if this manager has not been started yet.  
This variable
-   * can only be read or written from the event queue thread.
-   */
-  private var _channelManager: NodeToControllerChannelManager = _
-
-  /**
-   * The broker epoch from the previous run, or empty if the epoch is not 
found.
-   */
-  @volatile private var previousBrokerEpoch: OptionalLong = 
OptionalLong.empty()
-
-  /**
-   * The event queue.
-   */
-  private[server] val eventQueue = new KafkaEventQueue(time,
-    logContext,
-    threadNamePrefix + "lifecycle-manager-",
-    new ShutdownEvent())
-
-  /**
-   * Start the BrokerLifecycleManager.
-   *
-   * @param highestMetadataOffsetProvider Provides the current highest 
metadata offset.
-   * @param channelManager                The NodeToControllerChannelManager 
to use.
-   * @param clusterId                     The cluster ID.
-   * @param advertisedListeners           The advertised listeners for this 
broker.
-   * @param supportedFeatures             The features for this broker.
-   * @param previousBrokerEpoch           The broker epoch before the reboot.
-   *
-   */
-  def start(highestMetadataOffsetProvider: () => Long,
-            channelManager: NodeToControllerChannelManager,
-            clusterId: String,
-            advertisedListeners: ListenerCollection,
-            supportedFeatures: util.Map[String, VersionRange],
-            previousBrokerEpoch: OptionalLong): Unit = {
-    this.previousBrokerEpoch = previousBrokerEpoch
-    eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
-      channelManager, clusterId, advertisedListeners, supportedFeatures))
-  }
-
-  def setReadyToUnfence(): CompletableFuture[Void] = {
-    eventQueue.append(new SetReadyToUnfenceEvent())
-    initialUnfenceFuture
-  }
-
-  /**
-   * Propagate directory failures to the controller.
-   * @param directory The ID for the directory that failed.
-   */
-  def propagateDirectoryFailure(directory: Uuid, timeout: Long): Unit = {
-    eventQueue.append(new OfflineDirEvent(directory))
-    // If we can't communicate the offline directory to the controller, we 
should shut down.
-    eventQueue.scheduleDeferred("offlineDirFailure",
-      new DeadlineFunction(time.nanoseconds() + MILLISECONDS.toNanos(timeout)),
-      new OfflineDirBrokerFailureEvent(directory))
-  }
-
-  def resendBrokerRegistration(): Unit = {
-    eventQueue.append(new ResendBrokerRegistrationEvent())
-  }
-
-  private class ResendBrokerRegistrationEvent extends EventQueue.Event {
-    override def run(): Unit = {
-      registered = false
-      scheduleNextCommunicationImmediately()
-    }
-  }
-
-  def brokerEpoch: Long = _brokerEpoch
-
-  def state: BrokerState = _state
-
-  private class BeginControlledShutdownEvent extends EventQueue.Event {
-    override def run(): Unit = {
-      _state match {
-        case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
-          info("Attempted to enter pending controlled shutdown state, but we 
are " +
-          "already in that state.")
-        case BrokerState.RUNNING =>
-          info("Beginning controlled shutdown.")
-          _state = BrokerState.PENDING_CONTROLLED_SHUTDOWN
-          // Send the next heartbeat immediately in order to let the controller
-          // begin processing the controlled shutdown as soon as possible.
-          scheduleNextCommunicationImmediately()
-
-        case _ =>
-          info(s"Skipping controlled shutdown because we are in state 
${_state}.")
-          beginShutdown()
-      }
-    }
-  }
-
-  /**
-   * Enter the controlled shutdown state if we are in RUNNING state.
-   * Or, if we're not running, shut down immediately.
-   */
-  def beginControlledShutdown(): Unit = {
-    eventQueue.append(new BeginControlledShutdownEvent())
-  }
-
-  /**
-   * Start shutting down the BrokerLifecycleManager, but do not block.
-   */
-  def beginShutdown(): Unit = {
-    eventQueue.beginShutdown("beginShutdown")
-  }
-
-  /**
-   * Shut down the BrokerLifecycleManager and block until all threads are 
joined.
-   */
-  def close(): Unit = {
-    beginShutdown()
-    eventQueue.close()
-  }
-
-  private class SetReadyToUnfenceEvent extends EventQueue.Event {
-    override def run(): Unit = {
-      readyToUnfence = true
-      scheduleNextCommunicationImmediately()
-    }
-  }
-
-  private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event {
-    override def run(): Unit = {
-      if (offlineDirs.isEmpty) {
-        offlineDirs = mutable.Map(dir -> false)
-      } else {
-        offlineDirs += (dir -> false)
-      }
-      if (registered) {
-        scheduleNextCommunicationImmediately()
-      }
-    }
-  }
-
-  private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends 
EventQueue.Event {
-    override def run(): Unit = {
-      if (!offlineDirs.getOrElse(offlineDir, false)) {
-        error(s"Shutting down because couldn't communicate offline log dir 
$offlineDir with controllers")
-        shutdownHook()
-      }
-    }
-  }
-
-  private class StartupEvent(highestMetadataOffsetProvider: () => Long,
-                     channelManager: NodeToControllerChannelManager,
-                     clusterId: String,
-                     advertisedListeners: ListenerCollection,
-                     supportedFeatures: util.Map[String, VersionRange]) 
extends EventQueue.Event {
-    override def run(): Unit = {
-      _highestMetadataOffsetProvider = highestMetadataOffsetProvider
-      _channelManager = channelManager
-      _channelManager.start()
-      _state = BrokerState.STARTING
-      _clusterId = clusterId
-      _advertisedListeners = advertisedListeners.duplicate()
-      _supportedFeatures = new util.HashMap[String, 
VersionRange](supportedFeatures)
-      eventQueue.scheduleDeferred("initialRegistrationTimeout",
-        new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
-        new RegistrationTimeoutEvent())
-      sendBrokerRegistration()
-      info(s"Incarnation $incarnationId of broker $nodeId in cluster 
$clusterId " +
-        "is now STARTING.")
-    }
-  }
-
-  private def sendBrokerRegistration(): Unit = {
-    val features = new BrokerRegistrationRequestData.FeatureCollection()
-    _supportedFeatures.asScala.foreach {
-      case (name, range) => features.add(new 
BrokerRegistrationRequestData.Feature().
-        setName(name).
-        setMinSupportedVersion(range.min()).
-        setMaxSupportedVersion(range.max()))
-    }
-    val sortedLogDirs = new util.ArrayList[Uuid]
-    logDirs.foreach(sortedLogDirs.add)
-    sortedLogDirs.sort(new Comparator[Uuid]() {
-      override def compare(a: Uuid, b: Uuid): Int = a.compareTo(b)
-    })
-    val data = new BrokerRegistrationRequestData().
-        setBrokerId(nodeId).
-        setIsMigratingZkBroker(false).
-        setClusterId(_clusterId).
-        setFeatures(features).
-        setIncarnationId(incarnationId).
-        setListeners(_advertisedListeners).
-        setRack(rack.orNull).
-        setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L)).
-        setLogDirs(sortedLogDirs)
-    if (isDebugEnabled) {
-      debug(s"Sending broker registration $data")
-    }
-    _channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data),
-      new BrokerRegistrationResponseHandler())
-    communicationInFlight = true
-  }
-
-  // the response handler is not invoked from the event handler thread,
-  // so it is not safe to update state here, instead, schedule an event
-  // to continue handling the response on the event handler thread
-  private class BrokerRegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
-    override def onComplete(response: ClientResponse): Unit = {
-      eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false))
-    }
-
-    override def onTimeout(): Unit = {
-      info(s"Unable to register the broker because the RPC got timed out 
before it could be sent.")
-      eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true))
-    }
-  }
-
-  private class BrokerRegistrationResponseEvent(response: ClientResponse, 
timedOut: Boolean) extends EventQueue.Event {
-    override def run(): Unit = {
-      communicationInFlight = false
-      if (timedOut) {
-        scheduleNextCommunicationAfterFailure()
-        return
-      }
-      if (response.authenticationException() != null) {
-        error(s"Unable to register broker $nodeId because of an authentication 
exception.",
-          response.authenticationException())
-        scheduleNextCommunicationAfterFailure()
-      } else if (response.versionMismatch() != null) {
-        error(s"Unable to register broker $nodeId because of an API version 
problem.",
-          response.versionMismatch())
-        scheduleNextCommunicationAfterFailure()
-      } else if (response.responseBody() == null) {
-        warn(s"Unable to register broker $nodeId.")
-        scheduleNextCommunicationAfterFailure()
-      } else if 
(!response.responseBody().isInstanceOf[BrokerRegistrationResponse]) {
-        error(s"Unable to register broker $nodeId because the controller 
returned an " +
-          "invalid response type.")
-        scheduleNextCommunicationAfterFailure()
-      } else {
-        val message = 
response.responseBody().asInstanceOf[BrokerRegistrationResponse]
-        val errorCode = Errors.forCode(message.data().errorCode())
-        if (errorCode == Errors.NONE) {
-          _brokerEpoch = message.data().brokerEpoch()
-          registered = true
-          initialRegistrationSucceeded = true
-          info(s"Successfully registered broker $nodeId with broker epoch 
${_brokerEpoch}")
-          scheduleNextCommunicationImmediately() // Immediately send a 
heartbeat
-        } else {
-          info(s"Unable to register broker $nodeId because the controller 
returned " +
-            s"error $errorCode")
-          scheduleNextCommunicationAfterFailure()
-        }
-      }
-    }
-  }
-
-  private def sendBrokerHeartbeat(): Unit = {
-    val metadataOffset = _highestMetadataOffsetProvider()
-    val data = new BrokerHeartbeatRequestData().
-      setBrokerEpoch(_brokerEpoch).
-      setBrokerId(nodeId).
-      setCurrentMetadataOffset(metadataOffset).
-      setWantFence(!readyToUnfence).
-      setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN).
-      setOfflineLogDirs(offlineDirs.keys.toSeq.asJava)
-    if (isTraceEnabled) {
-      trace(s"Sending broker heartbeat $data")
-    }
-    val handler = new BrokerHeartbeatResponseHandler(offlineDirs.keys)
-    _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
handler)
-    communicationInFlight = true
-  }
-
-  // the response handler is not invoked from the event handler thread,
-  // so it is not safe to update state here, instead, schedule an event
-  // to continue handling the response on the event handler thread
-  private class BrokerHeartbeatResponseHandler(currentOfflineDirs: 
Iterable[Uuid]) extends ControllerRequestCompletionHandler {
-    override def onComplete(response: ClientResponse): Unit = {
-      eventQueue.prepend(new BrokerHeartbeatResponseEvent(response, false, 
currentOfflineDirs))
-    }
-
-    override def onTimeout(): Unit = {
-      info("Unable to send a heartbeat because the RPC got timed out before it 
could be sent.")
-      eventQueue.prepend(new BrokerHeartbeatResponseEvent(null, true, 
currentOfflineDirs))
-    }
-  }
-
-  private class BrokerHeartbeatResponseEvent(response: ClientResponse, 
timedOut: Boolean,
-                                             currentOfflineDirs: 
Iterable[Uuid]) extends EventQueue.Event {
-    override def run(): Unit = {
-      communicationInFlight = false
-      if (timedOut) {
-        scheduleNextCommunicationAfterFailure()
-        return
-      }
-      if (response.authenticationException() != null) {
-        error(s"Unable to send broker heartbeat for $nodeId because of an " +
-          "authentication exception.", response.authenticationException())
-        scheduleNextCommunicationAfterFailure()
-      } else if (response.versionMismatch() != null) {
-        error(s"Unable to send broker heartbeat for $nodeId because of an API 
" +
-          "version problem.", response.versionMismatch())
-        scheduleNextCommunicationAfterFailure()
-      } else if (response.responseBody() == null) {
-        warn(s"Unable to send broker heartbeat for $nodeId. Retrying.")
-        scheduleNextCommunicationAfterFailure()
-      } else if 
(!response.responseBody().isInstanceOf[BrokerHeartbeatResponse]) {
-        error(s"Unable to send broker heartbeat for $nodeId because the 
controller " +
-          "returned an invalid response type.")
-        scheduleNextCommunicationAfterFailure()
-      } else {
-        val message = 
response.responseBody().asInstanceOf[BrokerHeartbeatResponse]
-        val errorCode = Errors.forCode(message.data().errorCode())
-        if (errorCode == Errors.NONE) {
-          val responseData = message.data()
-          currentOfflineDirs.foreach(cur => offlineDirs.put(cur, true))
-          _state match {
-            case BrokerState.STARTING =>
-              if (responseData.isCaughtUp) {
-                info(s"The broker has caught up. Transitioning from STARTING 
to RECOVERY.")
-                _state = BrokerState.RECOVERY
-                initialCatchUpFuture.complete(null)
-              } else {
-                debug(s"The broker is STARTING. Still waiting to catch up with 
cluster metadata.")
-              }
-              // Schedule the heartbeat after only 10 ms so that in the case 
where
-              // there is no recovery work to be done, we start up a bit 
quicker.
-              scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
-            case BrokerState.RECOVERY =>
-              if (!responseData.isFenced) {
-                info(s"The broker has been unfenced. Transitioning from 
RECOVERY to RUNNING.")
-                initialUnfenceFuture.complete(null)
-                _state = BrokerState.RUNNING
-              } else {
-                info(s"The broker is in RECOVERY.")
-              }
-              scheduleNextCommunicationAfterSuccess()
-            case BrokerState.RUNNING =>
-              debug(s"The broker is RUNNING. Processing heartbeat response.")
-              scheduleNextCommunicationAfterSuccess()
-            case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
-              if (!responseData.shouldShutDown()) {
-                info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, 
still waiting " +
-                  "for the active controller.")
-                if (!gotControlledShutdownResponse) {
-                  // If this is the first pending controlled shutdown response 
we got,
-                  // schedule our next heartbeat a little bit sooner than we 
usually would.
-                  // In the case where controlled shutdown completes quickly, 
this will
-                  // speed things up a little bit.
-                  scheduleNextCommunication(NANOSECONDS.convert(50, 
MILLISECONDS))
-                } else {
-                  scheduleNextCommunicationAfterSuccess()
-                }
-              } else {
-                info(s"The controller has asked us to exit controlled 
shutdown.")
-                beginShutdown()
-              }
-              gotControlledShutdownResponse = true
-            case BrokerState.SHUTTING_DOWN =>
-              info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat 
response.")
-            case _ =>
-              error(s"Unexpected broker state ${_state}")
-              scheduleNextCommunicationAfterSuccess()
-          }
-        } else {
-          warn(s"Broker $nodeId sent a heartbeat request but received error 
$errorCode.")
-          scheduleNextCommunicationAfterFailure()
-        }
-      }
-    }
-  }
-
-  private def scheduleNextCommunicationImmediately(): Unit = {
-      scheduleNextCommunication(0)
-  }
-
-  private def scheduleNextCommunicationAfterFailure(): Unit = {
-    nextSchedulingShouldBeImmediate = false // never immediately reschedule 
after a failure
-    scheduleNextCommunication(NANOSECONDS.convert(
-      config.brokerHeartbeatIntervalMs.longValue() , MILLISECONDS))
-  }
-
-  private def scheduleNextCommunicationAfterSuccess(): Unit = {
-    scheduleNextCommunication(NANOSECONDS.convert(
-      config.brokerHeartbeatIntervalMs.longValue() , MILLISECONDS))
-  }
-
-  private def scheduleNextCommunication(intervalNs: Long): Unit = {
-    val adjustedIntervalNs = if (nextSchedulingShouldBeImmediate) 0 else 
intervalNs
-    nextSchedulingShouldBeImmediate = false
-    trace(s"Scheduling next communication at 
${MILLISECONDS.convert(adjustedIntervalNs, NANOSECONDS)} " +
-      "ms from now.")
-    val deadlineNs = time.nanoseconds() + adjustedIntervalNs
-    eventQueue.scheduleDeferred("communication",
-      new DeadlineFunction(deadlineNs),
-      new CommunicationEvent())
-  }
-
-  private class RegistrationTimeoutEvent extends EventQueue.Event {
-    override def run(): Unit = {
-      if (!initialRegistrationSucceeded) {
-        error("Shutting down because we were unable to register with the 
controller quorum.")
-        eventQueue.beginShutdown("registrationTimeout")
-      }
-    }
-  }
-
-  private class CommunicationEvent extends EventQueue.Event {
-    override def run(): Unit = {
-      if (communicationInFlight) {
-        trace("Delaying communication because there is already one in flight.")
-        nextSchedulingShouldBeImmediate = true
-      } else if (registered) {
-        sendBrokerHeartbeat()
-      } else {
-        sendBrokerRegistration()
-      }
-    }
-  }
-
-  private class ShutdownEvent extends EventQueue.Event {
-    override def run(): Unit = {
-      info(s"Transitioning from ${_state} to ${BrokerState.SHUTTING_DOWN}.")
-      _state = BrokerState.SHUTTING_DOWN
-      controlledShutdownFuture.complete(null)
-      initialCatchUpFuture.cancel(false)
-      initialUnfenceFuture.cancel(false)
-      if (_channelManager != null) {
-        _channelManager.shutdown()
-        _channelManager = null
-      }
-    }
-  }
-}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index d9cb3aca495..624d7a6ed89 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -54,7 +54,7 @@ import 
org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState
 import org.apache.kafka.server.share.session.ShareSessionCache
 import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
 import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
-import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, 
ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, 
KRaftTopicCreator, NodeToControllerChannelManagerImpl, ProcessRole, 
RaftControllerNodeProvider}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, 
BrokerLifecycleManager, ClientMetricsManager, DefaultApiVersionManager, 
DelayedActionQueue, KRaftTopicCreator, NodeToControllerChannelManagerImpl, 
ProcessRole, RaftControllerNodeProvider}
 import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -220,7 +220,7 @@ class BrokerServer(
       lifecycleManager = new BrokerLifecycleManager(config,
         time,
         s"broker-${config.nodeId}-",
-        logDirs = logManager.directoryIdsSet,
+        logManager.directoryIdsSet.asJava,
         () => new Thread(() => shutdown(), "kafka-shutdown-thread").start())
 
       // Enable delegation token cache for all SCRAM mechanisms to simplify 
dynamic update.
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 65097f53a5c..8c3c6094e68 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -288,9 +288,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   def quotaConfig: QuotaConfig = _quotaConfig
 
   /** ********* General Configuration ***********/
-  val nodeId: Int = getInt(KRaftConfigs.NODE_ID_CONFIG)
-  val initialRegistrationTimeoutMs: Int = 
getInt(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG)
-  val brokerHeartbeatIntervalMs: Int = 
getInt(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG)
   val brokerSessionTimeoutMs: Int = 
getInt(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG)
   val controllerPerformanceSamplePeriodMs: Long = 
getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS)
   val controllerPerformanceAlwaysLogThresholdMs: Long = 
getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS)
@@ -379,7 +376,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   def numNetworkThreads = 
getInt(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
 
   /***************** rack configuration **************/
-  val rack = Option(getString(ServerConfigs.BROKER_RACK_CONFIG))
   val replicaSelectorClassName = 
Option(getString(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG))
 
   /** ********* Log Configuration ***********/
diff --git 
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
index a734c464d0f..f9cef4bd39c 100644
--- 
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
@@ -197,7 +197,7 @@ class FetchFromFollowerIntegrationTest extends 
BaseFetchRequestTest {
     
consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
classOf[RangeAssignor].getName)
     val consumers = brokers.map { server =>
       consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
-      consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, 
server.config.rack.orNull)
+      consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, 
server.config.rack.orElse(null))
       consumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
s"instance-${server.config.brokerId}")
       consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 
"1000")
       consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 478cd74396c..86d587eae0f 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import java.util
 import java.util.{Collections, OptionalLong, Properties}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.Node
@@ -27,6 +28,7 @@ import org.apache.kafka.common.requests.{AbstractRequest, 
AbstractResponse, Brok
 import org.apache.kafka.metadata.BrokerState
 import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
 import org.apache.kafka.server.config.ServerLogConfigs
+import org.apache.kafka.server.BrokerLifecycleManager
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test, Timeout}
 
@@ -59,14 +61,14 @@ class BrokerLifecycleManagerTest {
   @Test
   def testCreateAndClose(): Unit = {
     val context = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(context.config, context.time, 
"create-and-close-", Set(Uuid.fromString("oFoTeS9QT0aAyCyH41v45A")))
+    manager = new BrokerLifecycleManager(context.config, context.time, 
"create-and-close-", util.Set.of(Uuid.fromString("oFoTeS9QT0aAyCyH41v45A")))
     manager.close()
   }
 
   @Test
   def testCreateStartAndClose(): Unit = {
     val context = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(context.config, context.time, 
"create-start-and-close-", Set(Uuid.fromString("uiUADXZWTPixVvp6UWFWnw")))
+    manager = new BrokerLifecycleManager(context.config, context.time, 
"create-start-and-close-", 
util.Set.of(Uuid.fromString("uiUADXZWTPixVvp6UWFWnw")))
     assertEquals(BrokerState.NOT_RUNNING, manager.state)
     manager.start(() => context.highestMetadataOffset.get(),
       context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
@@ -81,7 +83,7 @@ class BrokerLifecycleManagerTest {
   @Test
   def testSuccessfulRegistration(): Unit = {
     val context = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+    manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", 
util.Set.of(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
     val controllerNode = new Node(3000, "localhost", 8021)
     context.controllerNodeProvider.node.set(controllerNode)
     manager.start(() => context.highestMetadataOffset.get(),
@@ -103,7 +105,7 @@ class BrokerLifecycleManagerTest {
   def testRegistrationTimeout(): Unit = {
     val context = new RegistrationTestContext(configProperties)
     val controllerNode = new Node(3000, "localhost", 8021)
-    manager = new BrokerLifecycleManager(context.config, context.time, 
"registration-timeout-", Set(Uuid.fromString("9XBOAtr4T0Wbx2sbiWh6xg")))
+    manager = new BrokerLifecycleManager(context.config, context.time, 
"registration-timeout-", util.Set.of(Uuid.fromString("9XBOAtr4T0Wbx2sbiWh6xg")))
     context.controllerNodeProvider.node.set(controllerNode)
     def newDuplicateRegistrationResponse(): Unit = {
       context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@@ -143,7 +145,7 @@ class BrokerLifecycleManagerTest {
   @Test
   def testControlledShutdown(): Unit = {
     val context = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(context.config, context.time, 
"controlled-shutdown-", Set(Uuid.fromString("B4RtUz1ySGip3A7ZFYB2dg")))
+    manager = new BrokerLifecycleManager(context.config, context.time, 
"controlled-shutdown-", util.Set.of(Uuid.fromString("B4RtUz1ySGip3A7ZFYB2dg")))
     val controllerNode = new Node(3000, "localhost", 8021)
     context.controllerNodeProvider.node.set(controllerNode)
     context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@@ -224,7 +226,7 @@ class BrokerLifecycleManagerTest {
   @Test
   def testAlwaysSendsAccumulatedOfflineDirs(): Unit = {
     val ctx = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"offline-dirs-sent-in-heartbeat-", 
Set(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")))
+    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"offline-dirs-sent-in-heartbeat-", 
util.Set.of(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")))
     val controllerNode = new Node(3000, "localhost", 8021)
     ctx.controllerNodeProvider.node.set(controllerNode)
 
@@ -248,7 +250,7 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testRegistrationIncludesDirs(): Unit = {
-    val logDirs = Set("ad5FLIeCTnaQdai5vOjeng", 
"ybdzUKmYSLK6oiIpI6CPlw").map(Uuid.fromString)
+    val logDirs = util.Set.of(Uuid.fromString("ad5FLIeCTnaQdai5vOjeng"), 
Uuid.fromString("ybdzUKmYSLK6oiIpI6CPlw"))
     val ctx = new RegistrationTestContext(configProperties)
     manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"registration-includes-dirs-", logDirs)
     val controllerNode = new Node(3000, "localhost", 8021)
@@ -261,13 +263,13 @@ class BrokerLifecycleManagerTest {
       Collections.emptyMap(), OptionalLong.empty())
     val request = poll(ctx, manager, 
registration).asInstanceOf[BrokerRegistrationRequest]
 
-    assertEquals(logDirs, request.data.logDirs().asScala.toSet)
+    assertEquals(logDirs, new util.HashSet(request.data.logDirs()))
   }
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
     val ctx = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", 
util.Set.of(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
 
     val controllerNode = new Node(3000, "localhost", 8021)
     ctx.controllerNodeProvider.node.set(controllerNode)
diff --git 
a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
index 1d3048cec6a..96283d60389 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
@@ -62,7 +62,7 @@ class DescribeClusterRequestTest extends BaseRequestTest {
         .setBrokerId(server.config.brokerId)
         .setHost("localhost")
         .setPort(server.socketServer.boundPort(listenerName))
-        .setRack(server.config.rack.orNull)
+        .setRack(server.config.rack.orElse(null))
     }.toSet
 
     val expectedClusterId = brokers.last.clusterId
diff --git 
a/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java 
b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
new file mode 100644
index 00000000000..e148fb1c0b1
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerHeartbeatResponseData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import 
org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.BrokerHeartbeatRequest;
+import org.apache.kafka.common.requests.BrokerHeartbeatResponse;
+import org.apache.kafka.common.requests.BrokerRegistrationRequest;
+import org.apache.kafka.common.requests.BrokerRegistrationResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * The broker lifecycle manager owns the broker state.
+ *
+ * Its inputs are messages passed in from other parts of the broker and from 
the
+ * controller: requests to start up, or shut down, for example. Its output are 
the broker
+ * state and various futures that can be used to wait for broker state 
transitions to
+ * occur.
+ *
+ * The lifecycle manager handles registering the broker with the controller, 
as described
+ * in KIP-631. After registration is complete, it handles sending periodic 
broker
+ * heartbeats and processing the responses.
+ *
+ * This code uses an event queue paradigm. Modifications get translated into 
events, which
+ * are placed on the queue to be processed sequentially. As described in the 
JavaDoc for
+ * each variable, most mutable state can be accessed only from that event 
queue thread.
+ * In some cases we expose a volatile variable which can be read from any 
thread, but only
+ * written from the event queue thread.
+ */
+public class BrokerLifecycleManager {
+
+    private final Logger logger;
+    private final KafkaEventQueue eventQueue;
+    private final AbstractKafkaConfig config;
+    private final Time time;
+    private final Set<Uuid> logDirs;
+    private final Runnable shutdownHook;
+
+    /**
+     * The broker id.
+     */
+    private final int nodeId;
+
+    /**
+     * The broker rack, or null if there is no configured rack.
+     */
+    private final Optional<String> rack;
+
+    /**
+     * How long to wait for registration to succeed before failing the startup 
process.
+     */
+    private final long initialTimeoutNs;
+
+    /**
+     * The broker incarnation ID.  This ID uniquely identifies each time we 
start the broker
+     */
+    private final Uuid incarnationId = Uuid.randomUuid();
+
+    /**
+     * A future which is completed just as soon as the broker has caught up 
with the latest
+     * metadata offset for the first time.
+     */
+    private final CompletableFuture<Void> initialCatchUpFuture = new 
CompletableFuture<>();
+
+    /**
+     * A future which is completed when the broker is unfenced for the first 
time.
+     */
+    private final CompletableFuture<Void> initialUnfenceFuture = new 
CompletableFuture<>();
+
+    /**
+     * A future which is completed when controlled shutdown is done.
+     */
+    private final CompletableFuture<Void> controlledShutdownFuture = new 
CompletableFuture<>();
+
+    /**
+     * The broker epoch, or -1 if the broker has not yet registered.  This 
variable can only
+     * be written from the event queue thread.
+     */
+    private volatile long brokerEpoch = -1L;
+
+    /**
+     * The current broker state.  This variable can only be written from the 
event queue
+     * thread.
+     */
+    private volatile BrokerState state = BrokerState.NOT_RUNNING;
+
+    /**
+     * A thread-safe callback function which gives this manager the current 
highest metadata
+     * offset.  This variable can only be read or written from the event queue 
thread.
+     */
+    private Supplier<Long> highestMetadataOffsetProvider;
+
+    /**
+     * True only if we are ready to unfence the broker.  This variable can 
only be read or
+     * written from the event queue thread.
+     */
+    private boolean readyToUnfence = false;
+
+    /**
+     * Map of accumulated offline directories. The value is true if the 
directory has been successfully communicated
+     * to the Controller.
+     * This variable can only be read or written from the event queue thread.
+     */
+    private Map<Uuid, Boolean> offlineDirs = new HashMap<>();
+
+    /**
+     * True if we sent an event queue to the active controller requesting 
controlled
+     * shutdown.  This variable can only be read or written from the event 
queue thread.
+     */
+    private boolean gotControlledShutdownResponse = false;
+
+    /**
+     * Whether this broker is registered with the controller quorum.
+     * This variable can only be read or written from the event queue thread.
+     */
+    private boolean registered = false;
+
+    /**
+     * True if a request has been sent and a response or timeout has not yet 
been processed.
+     * This variable can only be read or written from the event queue thread.
+     */
+    private boolean communicationInFlight = false;
+
+    /**
+     * True if we should schedule the next communication immediately. This is 
used to delay
+     * an immediate scheduling of a communication event if one is already in 
flight.
+     * This variable can only be read or written from the event queue thread.
+     */
+    private boolean nextSchedulingShouldBeImmediate = false;
+
+    /**
+     * True if the initial registration succeeded.  This variable can only be 
read or
+     * written from the event queue thread.
+     */
+    private boolean initialRegistrationSucceeded = false;
+
+    /**
+     * The cluster ID, or null if this manager has not been started yet.  This 
variable can
+     * only be read or written from the event queue thread.
+     */
+    private String clusterId;
+
+    /**
+     * The listeners which this broker advertises.  This variable can only be 
read or
+     * written from the event queue thread.
+     */
+    private ListenerCollection advertisedListeners;
+
+    /**
+     * The features supported by this broker.  This variable can only be read 
or written
+     * from the event queue thread.
+     */
+    private Map<String, VersionRange> supportedFeatures;
+
+    /**
+     * The channel manager, or null if this manager has not been started yet.  
This variable
+     * can only be read or written from the event queue thread.
+     */
+    private NodeToControllerChannelManager channelManager;
+
+    /**
+     * The broker epoch from the previous run, or empty if the epoch is not 
found.
+     */
+    private volatile OptionalLong previousBrokerEpoch = OptionalLong.empty();
+
+    public BrokerLifecycleManager(
+            AbstractKafkaConfig config,
+            Time time,
+            String threadNamePrefix,
+            Set<Uuid> logDirs) {
+        this(config, time, threadNamePrefix, logDirs, () -> { });
+    }
+
+    public BrokerLifecycleManager(
+            AbstractKafkaConfig config,
+            Time time,
+            String threadNamePrefix,
+            Set<Uuid> logDirs,
+            Runnable shutdownHook) {
+        this.config = config;
+        this.time = time;
+        this.logDirs = logDirs;
+        this.shutdownHook = shutdownHook;
+        LogContext logContext = new LogContext("[BrokerLifecycleManager id=" + 
this.config.nodeId() + "] ");
+        this.logger = logContext.logger(BrokerLifecycleManager.class);
+        this.nodeId = config.nodeId();
+        this.rack = config.rack();
+        this.initialTimeoutNs = 
MILLISECONDS.toNanos(config.initialRegistrationTimeoutMs());
+        this.eventQueue = new KafkaEventQueue(
+                time,
+                logContext,
+                threadNamePrefix + "lifecycle-manager-",
+                new ShutdownEvent());
+    }
+
+    /**
+     * Start the BrokerLifecycleManager.
+     *
+     * @param highestMetadataOffsetProvider Provides the current highest 
metadata offset.
+     * @param channelManager                The NodeToControllerChannelManager 
to use.
+     * @param clusterId                     The cluster ID.
+     * @param advertisedListeners           The advertised listeners for this 
broker.
+     * @param supportedFeatures             The features for this broker.
+     * @param previousBrokerEpoch           The broker epoch before the reboot.
+     */
+    public void start(Supplier<Long> highestMetadataOffsetProvider,
+               NodeToControllerChannelManager channelManager,
+               String clusterId,
+               ListenerCollection advertisedListeners,
+               Map<String, VersionRange> supportedFeatures,
+               OptionalLong previousBrokerEpoch) {
+        this.previousBrokerEpoch = previousBrokerEpoch;
+        eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
+                channelManager, clusterId, advertisedListeners, 
supportedFeatures));
+    }
+
+    public CompletableFuture<Void> setReadyToUnfence() {
+        eventQueue.append(new SetReadyToUnfenceEvent());
+        return initialUnfenceFuture;
+    }
+
+    /**
+     * Propagate directory failures to the controller.
+     *
+     * @param directory The ID for the directory that failed.
+     */
+    public void propagateDirectoryFailure(Uuid directory, long timeout) {
+        eventQueue.append(new OfflineDirEvent(directory));
+        // If we can't communicate the offline directory to the controller, we 
should shut down.
+        eventQueue.scheduleDeferred("offlineDirFailure",
+                new EventQueue.DeadlineFunction(time.nanoseconds() + 
MILLISECONDS.toNanos(timeout)),
+                new OfflineDirBrokerFailureEvent(directory));
+    }
+
+    public void resendBrokerRegistration() {
+        eventQueue.append(new ResendBrokerRegistrationEvent());
+    }
+
+    private class ResendBrokerRegistrationEvent implements EventQueue.Event {
+        @Override
+        public void run() {
+            registered = false;
+            scheduleNextCommunicationImmediately();
+        }
+    }
+
+    public long brokerEpoch() {
+        return brokerEpoch;
+    }
+
+    public BrokerState state() {
+        return state;
+    }
+
+    public CompletableFuture<Void> initialCatchUpFuture() {
+        return initialCatchUpFuture;
+    }
+
+    public CompletableFuture<Void> initialUnfenceFuture() {
+        return initialUnfenceFuture;
+    }
+
+    public CompletableFuture<Void> controlledShutdownFuture() {
+        return controlledShutdownFuture;
+    }
+
+    public KafkaEventQueue eventQueue() {
+        return eventQueue;
+    }
+
+    private class BeginControlledShutdownEvent implements EventQueue.Event {
+        @Override
+        public void run() {
+            switch (state) {
+                case PENDING_CONTROLLED_SHUTDOWN ->
+                    logger.info("Attempted to enter pending controlled 
shutdown state, but we are already in that state.");
+                case RUNNING -> {
+                    logger.info("Beginning controlled shutdown.");
+                    state = BrokerState.PENDING_CONTROLLED_SHUTDOWN;
+                    // Send the next heartbeat immediately in order to let the 
controller
+                    // begin processing the controlled shutdown as soon as 
possible.
+                    scheduleNextCommunicationImmediately();
+                }
+                default -> {
+                    logger.info("Skipping controlled shutdown because we are 
in state {}.", state);
+                    beginShutdown();
+                }
+            }
+        }
+    }
+
+    /**
+     * Enter the controlled shutdown state if we are in RUNNING state.
+     * Or, if we're not running, shut down immediately.
+     */
+    public void beginControlledShutdown() {
+        eventQueue.append(new BeginControlledShutdownEvent());
+    }
+
+    /**
+     * Start shutting down the BrokerLifecycleManager, but do not block.
+     */
+    public void beginShutdown() {
+        eventQueue.beginShutdown("beginShutdown");
+    }
+
+    /**
+     * Shut down the BrokerLifecycleManager and block until all threads are 
joined.
+     */
+    public void close() throws InterruptedException {
+        beginShutdown();
+        eventQueue.close();
+    }
+
+    private class SetReadyToUnfenceEvent implements EventQueue.Event {
+        @Override
+        public void run() {
+            readyToUnfence = true;
+            scheduleNextCommunicationImmediately();
+        }
+    }
+
+    private class OfflineDirEvent implements EventQueue.Event {
+
+        private final Uuid dir;
+
+        OfflineDirEvent(Uuid dir) {
+            this.dir = dir;
+        }
+
+        @Override
+        public void run() {
+            offlineDirs.put(dir, false);
+            if (registered) {
+                scheduleNextCommunicationImmediately();
+            }
+        }
+    }
+
+    private class OfflineDirBrokerFailureEvent implements EventQueue.Event {
+
+        private final Uuid offlineDir;
+
+        OfflineDirBrokerFailureEvent(Uuid offlineDir) {
+            this.offlineDir = offlineDir;
+        }
+
+        @Override
+        public void run() {
+            if (!offlineDirs.getOrDefault(offlineDir, false)) {
+                logger.error("Shutting down because couldn't communicate 
offline log dir {} with controllers", offlineDir);
+                shutdownHook.run();
+            }
+        }
+    }
+
+    private class StartupEvent implements EventQueue.Event {
+
+        private final Supplier<Long> highestMetadataOffsetProvider;
+        private final NodeToControllerChannelManager channelManager;
+        private final String clusterId;
+        private final ListenerCollection advertisedListeners;
+        private final Map<String, VersionRange> supportedFeatures;
+
+        StartupEvent(Supplier<Long> highestMetadataOffsetProvider,
+                     NodeToControllerChannelManager channelManager,
+                     String clusterId,
+                     ListenerCollection advertisedListeners,
+                     Map<String, VersionRange> supportedFeatures) {
+            this.highestMetadataOffsetProvider = highestMetadataOffsetProvider;
+            this.channelManager = channelManager;
+            this.clusterId = clusterId;
+            this.advertisedListeners = advertisedListeners;
+            this.supportedFeatures = supportedFeatures;
+        }
+
+        @Override
+        public void run() {
+            BrokerLifecycleManager.this.highestMetadataOffsetProvider = 
this.highestMetadataOffsetProvider;
+            BrokerLifecycleManager.this.channelManager = channelManager;
+            BrokerLifecycleManager.this.channelManager.start();
+            state = BrokerState.STARTING;
+            BrokerLifecycleManager.this.clusterId = clusterId;
+            BrokerLifecycleManager.this.advertisedListeners = 
advertisedListeners.duplicate();
+            BrokerLifecycleManager.this.supportedFeatures = 
Map.copyOf(supportedFeatures);
+            eventQueue.scheduleDeferred("initialRegistrationTimeout",
+                    new EventQueue.DeadlineFunction(time.nanoseconds() + 
initialTimeoutNs),
+                    new RegistrationTimeoutEvent());
+            sendBrokerRegistration();
+            logger.info("Incarnation {} of broker {} in cluster {} is now 
STARTING.", incarnationId, nodeId, clusterId);
+        }
+    }
+
+    private void sendBrokerRegistration() {
+        BrokerRegistrationRequestData.FeatureCollection features = new 
BrokerRegistrationRequestData.FeatureCollection();
+        supportedFeatures.forEach((name, range) ->
+                features.add(new BrokerRegistrationRequestData.Feature()
+                        .setName(name)
+                        .setMinSupportedVersion(range.min())
+                        .setMaxSupportedVersion(range.max()))
+        );
+        List<Uuid> sortedLogDirs = new ArrayList<>(logDirs);
+        sortedLogDirs.sort(Uuid::compareTo);
+        BrokerRegistrationRequestData data = new 
BrokerRegistrationRequestData()
+            .setBrokerId(nodeId)
+            .setIsMigratingZkBroker(false)
+            .setClusterId(clusterId)
+            .setFeatures(features)
+            .setIncarnationId(incarnationId)
+            .setListeners(advertisedListeners)
+            .setRack(rack.orElse(null))
+            .setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L))
+            .setLogDirs(sortedLogDirs);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Sending broker registration {}", data);
+        }
+        channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data),
+                new BrokerRegistrationResponseHandler());
+        communicationInFlight = true;
+    }
+
+    // the response handler is not invoked from the event handler thread,
+    // so it is not safe to update state here, instead, schedule an event
+    // to continue handling the response on the event handler thread
+    private class BrokerRegistrationResponseHandler implements 
ControllerRequestCompletionHandler {
+        @Override
+        public void onComplete(ClientResponse response) {
+            eventQueue.prepend(new BrokerRegistrationResponseEvent(response, 
false));
+        }
+
+        @Override
+        public void onTimeout() {
+            logger.info("Unable to register the broker because the RPC got 
timed out before it could be sent.");
+            eventQueue.prepend(new BrokerRegistrationResponseEvent(null, 
true));
+        }
+    }
+
+    private class BrokerRegistrationResponseEvent implements EventQueue.Event {
+
+        private final ClientResponse response;
+        private final boolean timedOut;
+
+        BrokerRegistrationResponseEvent(ClientResponse response, boolean 
timedOut) {
+            this.response = response;
+            this.timedOut = timedOut;
+        }
+
+        @Override
+        public void run() {
+            communicationInFlight = false;
+            if (timedOut) {
+                scheduleNextCommunicationAfterFailure();
+                return;
+            }
+            if (response.authenticationException() != null) {
+                logger.error("Unable to register broker $nodeId because of an 
authentication exception.", response.authenticationException());
+                scheduleNextCommunicationAfterFailure();
+            } else if (response.versionMismatch() != null) {
+                logger.error("Unable to register broker $nodeId because of an 
API version problem.", response.versionMismatch());
+                scheduleNextCommunicationAfterFailure();
+            } else if (response.responseBody() == null) {
+                logger.warn("Unable to register broker {}.", nodeId);
+                scheduleNextCommunicationAfterFailure();
+            } else if (!(response.responseBody() instanceof 
BrokerRegistrationResponse message)) {
+                logger.error("Unable to register broker {} because the 
controller returned an invalid response type.", nodeId);
+                scheduleNextCommunicationAfterFailure();
+            } else {
+                Errors errorCode = Errors.forCode(message.data().errorCode());
+                if (errorCode == Errors.NONE) {
+                    brokerEpoch = message.data().brokerEpoch();
+                    registered = true;
+                    initialRegistrationSucceeded = true;
+                    logger.info("Successfully registered broker {} with broker 
epoch {}", nodeId, brokerEpoch);
+                    scheduleNextCommunicationImmediately(); // Immediately 
send a heartbeat
+                } else {
+                    logger.info("Unable to register broker {} because the 
controller returned error {}", nodeId, errorCode);
+                    scheduleNextCommunicationAfterFailure();
+                }
+            }
+        }
+    }
+
+    private void sendBrokerHeartbeat() {
+        Long metadataOffset = highestMetadataOffsetProvider.get();
+        BrokerHeartbeatRequestData data = new BrokerHeartbeatRequestData()
+            .setBrokerEpoch(brokerEpoch)
+            .setBrokerId(nodeId)
+            .setCurrentMetadataOffset(metadataOffset)
+            .setWantFence(!readyToUnfence)
+            .setWantShutDown(state == BrokerState.PENDING_CONTROLLED_SHUTDOWN)
+            .setOfflineLogDirs(new ArrayList<>(offlineDirs.keySet()));
+        if (logger.isTraceEnabled()) {
+            logger.trace("Sending broker heartbeat {}", data);
+        }
+        BrokerHeartbeatResponseHandler handler = new 
BrokerHeartbeatResponseHandler(offlineDirs.keySet());
+        channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
handler);
+        communicationInFlight = true;
+    }
+
+    // the response handler is not invoked from the event handler thread,
+    // so it is not safe to update state here, instead, schedule an event
+    // to continue handling the response on the event handler thread
+    private class BrokerHeartbeatResponseHandler implements 
ControllerRequestCompletionHandler {
+
+        private final Iterable<Uuid> currentOfflineDirs;
+
+        BrokerHeartbeatResponseHandler(Iterable<Uuid> currentOfflineDirs) {
+            this.currentOfflineDirs = currentOfflineDirs;
+        }
+
+        @Override
+        public void onComplete(ClientResponse response) {
+            eventQueue.prepend(new BrokerHeartbeatResponseEvent(response, 
false, currentOfflineDirs));
+        }
+
+        @Override
+        public void onTimeout() {
+            logger.info("Unable to send a heartbeat because the RPC got timed 
out before it could be sent.");
+            eventQueue.prepend(new BrokerHeartbeatResponseEvent(null, true, 
currentOfflineDirs));
+        }
+    }
+
+    private class BrokerHeartbeatResponseEvent implements EventQueue.Event {
+
+        private final ClientResponse response;
+        private final boolean timedOut;
+        private final Iterable<Uuid> currentOfflineDirs;
+
+        BrokerHeartbeatResponseEvent(ClientResponse response, boolean 
timedOut, Iterable<Uuid> currentOfflineDirs) {
+            this.response = response;
+            this.timedOut = timedOut;
+            this.currentOfflineDirs = currentOfflineDirs;
+        }
+
+        @Override
+        public void run() {
+            communicationInFlight = false;
+            if (timedOut) {
+                scheduleNextCommunicationAfterFailure();
+                return;
+            }
+            if (response.authenticationException() != null) {
+                logger.error("Unable to send broker heartbeat for {} because 
of an authentication exception.",
+                        nodeId, response.authenticationException());
+                scheduleNextCommunicationAfterFailure();
+            } else if (response.versionMismatch() != null) {
+                logger.error("Unable to send broker heartbeat for {} because 
of an API version problem.", nodeId, response.versionMismatch());
+                scheduleNextCommunicationAfterFailure();
+            } else if (response.responseBody() == null) {
+                logger.warn("Unable to send broker heartbeat for {}. 
Retrying.", nodeId);
+                scheduleNextCommunicationAfterFailure();
+            } else if (!(response.responseBody() instanceof 
BrokerHeartbeatResponse message)) {
+                logger.error("Unable to send broker heartbeat for {} because 
the controller returned an invalid response type.", nodeId);
+                scheduleNextCommunicationAfterFailure();
+            } else {
+                Errors errorCode = Errors.forCode(message.data().errorCode());
+                if (errorCode == Errors.NONE) {
+                    BrokerHeartbeatResponseData responseData = message.data();
+                    currentOfflineDirs.forEach(cur -> offlineDirs.put(cur, 
true));
+                    switch (state) {
+                        case STARTING -> {
+                            if (responseData.isCaughtUp()) {
+                                logger.info("The broker has caught up. 
Transitioning from STARTING to RECOVERY.");
+                                state = BrokerState.RECOVERY;
+                                initialCatchUpFuture.complete(null);
+                            } else {
+                                logger.debug("The broker is STARTING. Still 
waiting to catch up with cluster metadata.");
+                            }
+                            // Schedule the heartbeat after only 10 ms so that 
in the case where
+                            // there is no recovery work to be done, we start 
up a bit quicker.
+                            scheduleNextCommunication(NANOSECONDS.convert(10, 
MILLISECONDS));
+                        }
+                        case RECOVERY -> {
+                            if (!responseData.isFenced()) {
+                                logger.info("The broker has been unfenced. 
Transitioning from RECOVERY to RUNNING.");
+                                initialUnfenceFuture.complete(null);
+                                state = BrokerState.RUNNING;
+                            } else {
+                                logger.info("The broker is in RECOVERY.");
+                            }
+                            scheduleNextCommunicationAfterSuccess();
+                        }
+                        case RUNNING -> {
+                            logger.debug("The broker is RUNNING. Processing 
heartbeat response.");
+                            scheduleNextCommunicationAfterSuccess();
+                        }
+                        case PENDING_CONTROLLED_SHUTDOWN -> {
+                            if (!responseData.shouldShutDown()) {
+                                logger.info("The broker is in 
PENDING_CONTROLLED_SHUTDOWN state, still waiting " +
+                                        "for the active controller.");
+                                if (!gotControlledShutdownResponse) {
+                                    // If this is the first pending controlled 
shutdown response we got,
+                                    // schedule our next heartbeat a little 
bit sooner than we usually would.
+                                    // In the case where controlled shutdown 
completes quickly, this will
+                                    // speed things up a little bit.
+                                    
scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS));
+                                } else {
+                                    scheduleNextCommunicationAfterSuccess();
+                                }
+                            } else {
+                                logger.info("The controller has asked us to 
exit controlled shutdown.");
+                                beginShutdown();
+                            }
+                            gotControlledShutdownResponse = true;
+                        }
+                        case SHUTTING_DOWN -> logger.info("The broker is 
SHUTTING_DOWN. Ignoring heartbeat response.");
+                        default -> {
+                            logger.error("Unexpected broker state {}", state);
+                            scheduleNextCommunicationAfterSuccess();
+                        }
+                    }
+                } else {
+                    logger.warn("Broker {} sent a heartbeat request but 
received error {}.", nodeId, errorCode);
+                    scheduleNextCommunicationAfterFailure();
+                }
+            }
+        }
+    }
+
+    private void scheduleNextCommunicationImmediately() {
+        scheduleNextCommunication(0);
+    }
+
+    private void scheduleNextCommunicationAfterFailure() {
+        nextSchedulingShouldBeImmediate = false; // never immediately 
reschedule after a failure
+        
scheduleNextCommunication(NANOSECONDS.convert(config.brokerHeartbeatIntervalMs(),
 MILLISECONDS));
+    }
+
+    private void scheduleNextCommunicationAfterSuccess() {
+        
scheduleNextCommunication(NANOSECONDS.convert(config.brokerHeartbeatIntervalMs(),
 MILLISECONDS));
+    }
+
+    private void scheduleNextCommunication(long intervalNs) {
+        long adjustedIntervalNs = nextSchedulingShouldBeImmediate ? 0 : 
intervalNs;
+        nextSchedulingShouldBeImmediate = false;
+        logger.trace("Scheduling next communication at {}ms from now.", 
MILLISECONDS.convert(adjustedIntervalNs, NANOSECONDS));
+        long deadlineNs = time.nanoseconds() + adjustedIntervalNs;
+        eventQueue.scheduleDeferred("communication", new 
EventQueue.DeadlineFunction(deadlineNs), new CommunicationEvent());
+    }
+
+    private class RegistrationTimeoutEvent implements EventQueue.Event {
+        @Override
+        public void run() {
+            if (!initialRegistrationSucceeded) {
+                logger.error("Shutting down because we were unable to register 
with the controller quorum.");
+                eventQueue.beginShutdown("registrationTimeout");
+            }
+        }
+    }
+
+    private class CommunicationEvent implements EventQueue.Event {
+        @Override
+        public void run() {
+            if (communicationInFlight) {
+                logger.trace("Delaying communication because there is already 
one in flight.");
+                nextSchedulingShouldBeImmediate = true;
+            } else if (registered) {
+                sendBrokerHeartbeat();
+            } else {
+                sendBrokerRegistration();
+            }
+        }
+    }
+
+    private class ShutdownEvent implements EventQueue.Event {
+        @Override
+        public void run() throws InterruptedException {
+            logger.info("Transitioning from {} to {}.", state, 
BrokerState.SHUTTING_DOWN);
+            state = BrokerState.SHUTTING_DOWN;
+            controlledShutdownFuture.complete(null);
+            initialCatchUpFuture.cancel(false);
+            initialUnfenceFuture.cancel(false);
+            if (channelManager != null) {
+                channelManager.shutdown();
+                channelManager = null;
+            }
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index f0d25f3ca42..e649d6e6035 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -120,6 +120,22 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
         return interBrokerListenerNameAndSecurityProtocol().getValue();
     }
 
+    public int initialRegistrationTimeoutMs() {
+        return 
getInt(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG);
+    }
+
+    public int brokerHeartbeatIntervalMs() {
+        return getInt(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG);
+    }
+
+    public Optional<String> rack() {
+        return 
Optional.ofNullable(getString(ServerConfigs.BROKER_RACK_CONFIG));
+    }
+
+    public int nodeId() {
+        return getInt(KRaftConfigs.NODE_ID_CONFIG);
+    }
+
     public Map<ListenerName, SecurityProtocol> 
effectiveListenerSecurityProtocolMap() {
         Map<ListenerName, SecurityProtocol> mapValue =
                 
getMap(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,

Reply via email to