This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 390f126972b KAFKA-20083: Move BrokerLifecycleManager to server module
(#21336)
390f126972b is described below
commit 390f126972bd9d7609a0bbc40c0bc388d0be4b1e
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Jan 27 17:47:23 2026 +0100
KAFKA-20083: Move BrokerLifecycleManager to server module (#21336)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/server/BrokerLifecycleManager.scala | 622 ------------------
.../src/main/scala/kafka/server/BrokerServer.scala | 4 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 4 -
.../server/FetchFromFollowerIntegrationTest.scala | 2 +-
.../kafka/server/BrokerLifecycleManagerTest.scala | 20 +-
.../kafka/server/DescribeClusterRequestTest.scala | 2 +-
.../kafka/server/BrokerLifecycleManager.java | 722 +++++++++++++++++++++
.../kafka/server/config/AbstractKafkaConfig.java | 16 +
8 files changed, 753 insertions(+), 639 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
deleted file mode 100644
index 2368ebc21cc..00000000000
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ /dev/null
@@ -1,622 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.util
-import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS}
-import java.util.concurrent.CompletableFuture
-import kafka.utils.Logging
-import org.apache.kafka.clients.ClientResponse
-import org.apache.kafka.common.Uuid
-import
org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection
-import org.apache.kafka.common.message.{BrokerHeartbeatRequestData,
BrokerRegistrationRequestData}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{BrokerHeartbeatRequest,
BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse}
-import org.apache.kafka.metadata.{BrokerState, VersionRange}
-import org.apache.kafka.queue.EventQueue.DeadlineFunction
-import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
-import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
-
-import java.util.{Comparator, OptionalLong}
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-/**
- * The broker lifecycle manager owns the broker state.
- *
- * Its inputs are messages passed in from other parts of the broker and from
the
- * controller: requests to start up, or shut down, for example. Its output are
the broker
- * state and various futures that can be used to wait for broker state
transitions to
- * occur.
- *
- * The lifecycle manager handles registering the broker with the controller,
as described
- * in KIP-631. After registration is complete, it handles sending periodic
broker
- * heartbeats and processing the responses.
- *
- * This code uses an event queue paradigm. Modifications get translated into
events, which
- * are placed on the queue to be processed sequentially. As described in the
JavaDoc for
- * each variable, most mutable state can be accessed only from that event
queue thread.
- * In some cases we expose a volatile variable which can be read from any
thread, but only
- * written from the event queue thread.
- */
-class BrokerLifecycleManager(
- val config: KafkaConfig,
- val time: Time,
- val threadNamePrefix: String,
- val logDirs: Set[Uuid],
- val shutdownHook: () => Unit = () => {}
-) extends Logging {
-
- private def logPrefix(): String = {
- val builder = new StringBuilder("[BrokerLifecycleManager")
- builder.append(" id=").append(config.nodeId)
- builder.append("] ")
- builder.toString()
- }
-
- val logContext = new LogContext(logPrefix())
-
- this.logIdent = logContext.logPrefix()
-
- /**
- * The broker id.
- */
- private val nodeId = config.nodeId
-
- /**
- * The broker rack, or null if there is no configured rack.
- */
- private val rack = config.rack
-
- /**
- * How long to wait for registration to succeed before failing the startup
process.
- */
- private val initialTimeoutNs =
- MILLISECONDS.toNanos(config.initialRegistrationTimeoutMs.longValue())
-
- /**
- * The broker incarnation ID. This ID uniquely identifies each time we
start the broker
- */
- val incarnationId: Uuid = Uuid.randomUuid()
-
- /**
- * A future which is completed just as soon as the broker has caught up with
the latest
- * metadata offset for the first time.
- */
- val initialCatchUpFuture: CompletableFuture[Void] = new
CompletableFuture[Void]()
-
- /**
- * A future which is completed when the broker is unfenced for the first
time.
- */
- val initialUnfenceFuture: CompletableFuture[Void] = new
CompletableFuture[Void]()
-
- /**
- * A future which is completed when controlled shutdown is done.
- */
- val controlledShutdownFuture: CompletableFuture[Void] = new
CompletableFuture[Void]()
-
- /**
- * The broker epoch, or -1 if the broker has not yet registered. This
variable can only
- * be written from the event queue thread.
- */
- @volatile private var _brokerEpoch = -1L
-
- /**
- * The current broker state. This variable can only be written from the
event queue
- * thread.
- */
- @volatile private var _state = BrokerState.NOT_RUNNING
-
- /**
- * A thread-safe callback function which gives this manager the current
highest metadata
- * offset. This variable can only be read or written from the event queue
thread.
- */
- private var _highestMetadataOffsetProvider: () => Long = _
-
- /**
- * True only if we are ready to unfence the broker. This variable can only
be read or
- * written from the event queue thread.
- */
- private var readyToUnfence = false
-
- /**
- * Map of accumulated offline directories. The value is true if the
directory couldn't be communicated
- * to the Controller.
- * This variable can only be read or written from the event queue thread.
- */
- private var offlineDirs = mutable.Map[Uuid, Boolean]()
-
- /**
- * True if we sent an event queue to the active controller requesting
controlled
- * shutdown. This variable can only be read or written from the event queue
thread.
- */
- private var gotControlledShutdownResponse = false
-
- /**
- * Whether or not this broker is registered with the controller quorum.
- * This variable can only be read or written from the event queue thread.
- */
- private var registered = false
-
- /**
- * True if a request has been sent and a response or timeout has not yet
been processed.
- * This variable can only be read or written from the event queue thread.
- */
- private var communicationInFlight = false
-
- /**
- * True if we should schedule the next communication immediately. This is
used to delay
- * an immediate scheduling of a communication event if one is already in
flight.
- * This variable can only be read or written from the event queue thread.
- */
- private var nextSchedulingShouldBeImmediate = false
-
- /**
- * True if the initial registration succeeded. This variable can only be
read or
- * written from the event queue thread.
- */
- private var initialRegistrationSucceeded = false
-
- /**
- * The cluster ID, or null if this manager has not been started yet. This
variable can
- * only be read or written from the event queue thread.
- */
- private var _clusterId: String = _
-
- /**
- * The listeners which this broker advertises. This variable can only be
read or
- * written from the event queue thread.
- */
- private var _advertisedListeners: ListenerCollection = _
-
- /**
- * The features supported by this broker. This variable can only be read or
written
- * from the event queue thread.
- */
- private var _supportedFeatures: util.Map[String, VersionRange] = _
-
- /**
- * The channel manager, or null if this manager has not been started yet.
This variable
- * can only be read or written from the event queue thread.
- */
- private var _channelManager: NodeToControllerChannelManager = _
-
- /**
- * The broker epoch from the previous run, or empty if the epoch is not
found.
- */
- @volatile private var previousBrokerEpoch: OptionalLong =
OptionalLong.empty()
-
- /**
- * The event queue.
- */
- private[server] val eventQueue = new KafkaEventQueue(time,
- logContext,
- threadNamePrefix + "lifecycle-manager-",
- new ShutdownEvent())
-
- /**
- * Start the BrokerLifecycleManager.
- *
- * @param highestMetadataOffsetProvider Provides the current highest
metadata offset.
- * @param channelManager The NodeToControllerChannelManager
to use.
- * @param clusterId The cluster ID.
- * @param advertisedListeners The advertised listeners for this
broker.
- * @param supportedFeatures The features for this broker.
- * @param previousBrokerEpoch The broker epoch before the reboot.
- *
- */
- def start(highestMetadataOffsetProvider: () => Long,
- channelManager: NodeToControllerChannelManager,
- clusterId: String,
- advertisedListeners: ListenerCollection,
- supportedFeatures: util.Map[String, VersionRange],
- previousBrokerEpoch: OptionalLong): Unit = {
- this.previousBrokerEpoch = previousBrokerEpoch
- eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
- channelManager, clusterId, advertisedListeners, supportedFeatures))
- }
-
- def setReadyToUnfence(): CompletableFuture[Void] = {
- eventQueue.append(new SetReadyToUnfenceEvent())
- initialUnfenceFuture
- }
-
- /**
- * Propagate directory failures to the controller.
- * @param directory The ID for the directory that failed.
- */
- def propagateDirectoryFailure(directory: Uuid, timeout: Long): Unit = {
- eventQueue.append(new OfflineDirEvent(directory))
- // If we can't communicate the offline directory to the controller, we
should shut down.
- eventQueue.scheduleDeferred("offlineDirFailure",
- new DeadlineFunction(time.nanoseconds() + MILLISECONDS.toNanos(timeout)),
- new OfflineDirBrokerFailureEvent(directory))
- }
-
- def resendBrokerRegistration(): Unit = {
- eventQueue.append(new ResendBrokerRegistrationEvent())
- }
-
- private class ResendBrokerRegistrationEvent extends EventQueue.Event {
- override def run(): Unit = {
- registered = false
- scheduleNextCommunicationImmediately()
- }
- }
-
- def brokerEpoch: Long = _brokerEpoch
-
- def state: BrokerState = _state
-
- private class BeginControlledShutdownEvent extends EventQueue.Event {
- override def run(): Unit = {
- _state match {
- case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
- info("Attempted to enter pending controlled shutdown state, but we
are " +
- "already in that state.")
- case BrokerState.RUNNING =>
- info("Beginning controlled shutdown.")
- _state = BrokerState.PENDING_CONTROLLED_SHUTDOWN
- // Send the next heartbeat immediately in order to let the controller
- // begin processing the controlled shutdown as soon as possible.
- scheduleNextCommunicationImmediately()
-
- case _ =>
- info(s"Skipping controlled shutdown because we are in state
${_state}.")
- beginShutdown()
- }
- }
- }
-
- /**
- * Enter the controlled shutdown state if we are in RUNNING state.
- * Or, if we're not running, shut down immediately.
- */
- def beginControlledShutdown(): Unit = {
- eventQueue.append(new BeginControlledShutdownEvent())
- }
-
- /**
- * Start shutting down the BrokerLifecycleManager, but do not block.
- */
- def beginShutdown(): Unit = {
- eventQueue.beginShutdown("beginShutdown")
- }
-
- /**
- * Shut down the BrokerLifecycleManager and block until all threads are
joined.
- */
- def close(): Unit = {
- beginShutdown()
- eventQueue.close()
- }
-
- private class SetReadyToUnfenceEvent extends EventQueue.Event {
- override def run(): Unit = {
- readyToUnfence = true
- scheduleNextCommunicationImmediately()
- }
- }
-
- private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event {
- override def run(): Unit = {
- if (offlineDirs.isEmpty) {
- offlineDirs = mutable.Map(dir -> false)
- } else {
- offlineDirs += (dir -> false)
- }
- if (registered) {
- scheduleNextCommunicationImmediately()
- }
- }
- }
-
- private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends
EventQueue.Event {
- override def run(): Unit = {
- if (!offlineDirs.getOrElse(offlineDir, false)) {
- error(s"Shutting down because couldn't communicate offline log dir
$offlineDir with controllers")
- shutdownHook()
- }
- }
- }
-
- private class StartupEvent(highestMetadataOffsetProvider: () => Long,
- channelManager: NodeToControllerChannelManager,
- clusterId: String,
- advertisedListeners: ListenerCollection,
- supportedFeatures: util.Map[String, VersionRange])
extends EventQueue.Event {
- override def run(): Unit = {
- _highestMetadataOffsetProvider = highestMetadataOffsetProvider
- _channelManager = channelManager
- _channelManager.start()
- _state = BrokerState.STARTING
- _clusterId = clusterId
- _advertisedListeners = advertisedListeners.duplicate()
- _supportedFeatures = new util.HashMap[String,
VersionRange](supportedFeatures)
- eventQueue.scheduleDeferred("initialRegistrationTimeout",
- new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
- new RegistrationTimeoutEvent())
- sendBrokerRegistration()
- info(s"Incarnation $incarnationId of broker $nodeId in cluster
$clusterId " +
- "is now STARTING.")
- }
- }
-
- private def sendBrokerRegistration(): Unit = {
- val features = new BrokerRegistrationRequestData.FeatureCollection()
- _supportedFeatures.asScala.foreach {
- case (name, range) => features.add(new
BrokerRegistrationRequestData.Feature().
- setName(name).
- setMinSupportedVersion(range.min()).
- setMaxSupportedVersion(range.max()))
- }
- val sortedLogDirs = new util.ArrayList[Uuid]
- logDirs.foreach(sortedLogDirs.add)
- sortedLogDirs.sort(new Comparator[Uuid]() {
- override def compare(a: Uuid, b: Uuid): Int = a.compareTo(b)
- })
- val data = new BrokerRegistrationRequestData().
- setBrokerId(nodeId).
- setIsMigratingZkBroker(false).
- setClusterId(_clusterId).
- setFeatures(features).
- setIncarnationId(incarnationId).
- setListeners(_advertisedListeners).
- setRack(rack.orNull).
- setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L)).
- setLogDirs(sortedLogDirs)
- if (isDebugEnabled) {
- debug(s"Sending broker registration $data")
- }
- _channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data),
- new BrokerRegistrationResponseHandler())
- communicationInFlight = true
- }
-
- // the response handler is not invoked from the event handler thread,
- // so it is not safe to update state here, instead, schedule an event
- // to continue handling the response on the event handler thread
- private class BrokerRegistrationResponseHandler extends
ControllerRequestCompletionHandler {
- override def onComplete(response: ClientResponse): Unit = {
- eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false))
- }
-
- override def onTimeout(): Unit = {
- info(s"Unable to register the broker because the RPC got timed out
before it could be sent.")
- eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true))
- }
- }
-
- private class BrokerRegistrationResponseEvent(response: ClientResponse,
timedOut: Boolean) extends EventQueue.Event {
- override def run(): Unit = {
- communicationInFlight = false
- if (timedOut) {
- scheduleNextCommunicationAfterFailure()
- return
- }
- if (response.authenticationException() != null) {
- error(s"Unable to register broker $nodeId because of an authentication
exception.",
- response.authenticationException())
- scheduleNextCommunicationAfterFailure()
- } else if (response.versionMismatch() != null) {
- error(s"Unable to register broker $nodeId because of an API version
problem.",
- response.versionMismatch())
- scheduleNextCommunicationAfterFailure()
- } else if (response.responseBody() == null) {
- warn(s"Unable to register broker $nodeId.")
- scheduleNextCommunicationAfterFailure()
- } else if
(!response.responseBody().isInstanceOf[BrokerRegistrationResponse]) {
- error(s"Unable to register broker $nodeId because the controller
returned an " +
- "invalid response type.")
- scheduleNextCommunicationAfterFailure()
- } else {
- val message =
response.responseBody().asInstanceOf[BrokerRegistrationResponse]
- val errorCode = Errors.forCode(message.data().errorCode())
- if (errorCode == Errors.NONE) {
- _brokerEpoch = message.data().brokerEpoch()
- registered = true
- initialRegistrationSucceeded = true
- info(s"Successfully registered broker $nodeId with broker epoch
${_brokerEpoch}")
- scheduleNextCommunicationImmediately() // Immediately send a
heartbeat
- } else {
- info(s"Unable to register broker $nodeId because the controller
returned " +
- s"error $errorCode")
- scheduleNextCommunicationAfterFailure()
- }
- }
- }
- }
-
- private def sendBrokerHeartbeat(): Unit = {
- val metadataOffset = _highestMetadataOffsetProvider()
- val data = new BrokerHeartbeatRequestData().
- setBrokerEpoch(_brokerEpoch).
- setBrokerId(nodeId).
- setCurrentMetadataOffset(metadataOffset).
- setWantFence(!readyToUnfence).
- setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN).
- setOfflineLogDirs(offlineDirs.keys.toSeq.asJava)
- if (isTraceEnabled) {
- trace(s"Sending broker heartbeat $data")
- }
- val handler = new BrokerHeartbeatResponseHandler(offlineDirs.keys)
- _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data),
handler)
- communicationInFlight = true
- }
-
- // the response handler is not invoked from the event handler thread,
- // so it is not safe to update state here, instead, schedule an event
- // to continue handling the response on the event handler thread
- private class BrokerHeartbeatResponseHandler(currentOfflineDirs:
Iterable[Uuid]) extends ControllerRequestCompletionHandler {
- override def onComplete(response: ClientResponse): Unit = {
- eventQueue.prepend(new BrokerHeartbeatResponseEvent(response, false,
currentOfflineDirs))
- }
-
- override def onTimeout(): Unit = {
- info("Unable to send a heartbeat because the RPC got timed out before it
could be sent.")
- eventQueue.prepend(new BrokerHeartbeatResponseEvent(null, true,
currentOfflineDirs))
- }
- }
-
- private class BrokerHeartbeatResponseEvent(response: ClientResponse,
timedOut: Boolean,
- currentOfflineDirs:
Iterable[Uuid]) extends EventQueue.Event {
- override def run(): Unit = {
- communicationInFlight = false
- if (timedOut) {
- scheduleNextCommunicationAfterFailure()
- return
- }
- if (response.authenticationException() != null) {
- error(s"Unable to send broker heartbeat for $nodeId because of an " +
- "authentication exception.", response.authenticationException())
- scheduleNextCommunicationAfterFailure()
- } else if (response.versionMismatch() != null) {
- error(s"Unable to send broker heartbeat for $nodeId because of an API
" +
- "version problem.", response.versionMismatch())
- scheduleNextCommunicationAfterFailure()
- } else if (response.responseBody() == null) {
- warn(s"Unable to send broker heartbeat for $nodeId. Retrying.")
- scheduleNextCommunicationAfterFailure()
- } else if
(!response.responseBody().isInstanceOf[BrokerHeartbeatResponse]) {
- error(s"Unable to send broker heartbeat for $nodeId because the
controller " +
- "returned an invalid response type.")
- scheduleNextCommunicationAfterFailure()
- } else {
- val message =
response.responseBody().asInstanceOf[BrokerHeartbeatResponse]
- val errorCode = Errors.forCode(message.data().errorCode())
- if (errorCode == Errors.NONE) {
- val responseData = message.data()
- currentOfflineDirs.foreach(cur => offlineDirs.put(cur, true))
- _state match {
- case BrokerState.STARTING =>
- if (responseData.isCaughtUp) {
- info(s"The broker has caught up. Transitioning from STARTING
to RECOVERY.")
- _state = BrokerState.RECOVERY
- initialCatchUpFuture.complete(null)
- } else {
- debug(s"The broker is STARTING. Still waiting to catch up with
cluster metadata.")
- }
- // Schedule the heartbeat after only 10 ms so that in the case
where
- // there is no recovery work to be done, we start up a bit
quicker.
- scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
- case BrokerState.RECOVERY =>
- if (!responseData.isFenced) {
- info(s"The broker has been unfenced. Transitioning from
RECOVERY to RUNNING.")
- initialUnfenceFuture.complete(null)
- _state = BrokerState.RUNNING
- } else {
- info(s"The broker is in RECOVERY.")
- }
- scheduleNextCommunicationAfterSuccess()
- case BrokerState.RUNNING =>
- debug(s"The broker is RUNNING. Processing heartbeat response.")
- scheduleNextCommunicationAfterSuccess()
- case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
- if (!responseData.shouldShutDown()) {
- info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state,
still waiting " +
- "for the active controller.")
- if (!gotControlledShutdownResponse) {
- // If this is the first pending controlled shutdown response
we got,
- // schedule our next heartbeat a little bit sooner than we
usually would.
- // In the case where controlled shutdown completes quickly,
this will
- // speed things up a little bit.
- scheduleNextCommunication(NANOSECONDS.convert(50,
MILLISECONDS))
- } else {
- scheduleNextCommunicationAfterSuccess()
- }
- } else {
- info(s"The controller has asked us to exit controlled
shutdown.")
- beginShutdown()
- }
- gotControlledShutdownResponse = true
- case BrokerState.SHUTTING_DOWN =>
- info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat
response.")
- case _ =>
- error(s"Unexpected broker state ${_state}")
- scheduleNextCommunicationAfterSuccess()
- }
- } else {
- warn(s"Broker $nodeId sent a heartbeat request but received error
$errorCode.")
- scheduleNextCommunicationAfterFailure()
- }
- }
- }
- }
-
- private def scheduleNextCommunicationImmediately(): Unit = {
- scheduleNextCommunication(0)
- }
-
- private def scheduleNextCommunicationAfterFailure(): Unit = {
- nextSchedulingShouldBeImmediate = false // never immediately reschedule
after a failure
- scheduleNextCommunication(NANOSECONDS.convert(
- config.brokerHeartbeatIntervalMs.longValue() , MILLISECONDS))
- }
-
- private def scheduleNextCommunicationAfterSuccess(): Unit = {
- scheduleNextCommunication(NANOSECONDS.convert(
- config.brokerHeartbeatIntervalMs.longValue() , MILLISECONDS))
- }
-
- private def scheduleNextCommunication(intervalNs: Long): Unit = {
- val adjustedIntervalNs = if (nextSchedulingShouldBeImmediate) 0 else
intervalNs
- nextSchedulingShouldBeImmediate = false
- trace(s"Scheduling next communication at
${MILLISECONDS.convert(adjustedIntervalNs, NANOSECONDS)} " +
- "ms from now.")
- val deadlineNs = time.nanoseconds() + adjustedIntervalNs
- eventQueue.scheduleDeferred("communication",
- new DeadlineFunction(deadlineNs),
- new CommunicationEvent())
- }
-
- private class RegistrationTimeoutEvent extends EventQueue.Event {
- override def run(): Unit = {
- if (!initialRegistrationSucceeded) {
- error("Shutting down because we were unable to register with the
controller quorum.")
- eventQueue.beginShutdown("registrationTimeout")
- }
- }
- }
-
- private class CommunicationEvent extends EventQueue.Event {
- override def run(): Unit = {
- if (communicationInFlight) {
- trace("Delaying communication because there is already one in flight.")
- nextSchedulingShouldBeImmediate = true
- } else if (registered) {
- sendBrokerHeartbeat()
- } else {
- sendBrokerRegistration()
- }
- }
- }
-
- private class ShutdownEvent extends EventQueue.Event {
- override def run(): Unit = {
- info(s"Transitioning from ${_state} to ${BrokerState.SHUTTING_DOWN}.")
- _state = BrokerState.SHUTTING_DOWN
- controlledShutdownFuture.complete(null)
- initialCatchUpFuture.cancel(false)
- initialUnfenceFuture.cancel(false)
- if (_channelManager != null) {
- _channelManager.shutdown()
- _channelManager = null
- }
- }
- }
-}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index d9cb3aca495..624d7a6ed89 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -54,7 +54,7 @@ import
org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState
import org.apache.kafka.server.share.session.ShareSessionCache
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
-import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue,
KRaftTopicCreator, NodeToControllerChannelManagerImpl, ProcessRole,
RaftControllerNodeProvider}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
BrokerLifecycleManager, ClientMetricsManager, DefaultApiVersionManager,
DelayedActionQueue, KRaftTopicCreator, NodeToControllerChannelManagerImpl,
ProcessRole, RaftControllerNodeProvider}
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -220,7 +220,7 @@ class BrokerServer(
lifecycleManager = new BrokerLifecycleManager(config,
time,
s"broker-${config.nodeId}-",
- logDirs = logManager.directoryIdsSet,
+ logManager.directoryIdsSet.asJava,
() => new Thread(() => shutdown(), "kafka-shutdown-thread").start())
// Enable delegation token cache for all SCRAM mechanisms to simplify
dynamic update.
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 65097f53a5c..8c3c6094e68 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -288,9 +288,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
def quotaConfig: QuotaConfig = _quotaConfig
/** ********* General Configuration ***********/
- val nodeId: Int = getInt(KRaftConfigs.NODE_ID_CONFIG)
- val initialRegistrationTimeoutMs: Int =
getInt(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG)
- val brokerHeartbeatIntervalMs: Int =
getInt(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG)
val brokerSessionTimeoutMs: Int =
getInt(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG)
val controllerPerformanceSamplePeriodMs: Long =
getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS)
val controllerPerformanceAlwaysLogThresholdMs: Long =
getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS)
@@ -379,7 +376,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
def numNetworkThreads =
getInt(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
/***************** rack configuration **************/
- val rack = Option(getString(ServerConfigs.BROKER_RACK_CONFIG))
val replicaSelectorClassName =
Option(getString(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG))
/** ********* Log Configuration ***********/
diff --git
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
index a734c464d0f..f9cef4bd39c 100644
---
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
@@ -197,7 +197,7 @@ class FetchFromFollowerIntegrationTest extends
BaseFetchRequestTest {
consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
classOf[RangeAssignor].getName)
val consumers = brokers.map { server =>
consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest")
- consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG,
server.config.rack.orNull)
+ consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG,
server.config.rack.orElse(null))
consumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
s"instance-${server.config.brokerId}")
consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG,
"1000")
consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false")
diff --git
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 478cd74396c..86d587eae0f 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -17,6 +17,7 @@
package kafka.server
+import java.util
import java.util.{Collections, OptionalLong, Properties}
import kafka.utils.TestUtils
import org.apache.kafka.common.Node
@@ -27,6 +28,7 @@ import org.apache.kafka.common.requests.{AbstractRequest,
AbstractResponse, Brok
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
import org.apache.kafka.server.config.ServerLogConfigs
+import org.apache.kafka.server.BrokerLifecycleManager
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test, Timeout}
@@ -59,14 +61,14 @@ class BrokerLifecycleManagerTest {
@Test
def testCreateAndClose(): Unit = {
val context = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(context.config, context.time,
"create-and-close-", Set(Uuid.fromString("oFoTeS9QT0aAyCyH41v45A")))
+ manager = new BrokerLifecycleManager(context.config, context.time,
"create-and-close-", util.Set.of(Uuid.fromString("oFoTeS9QT0aAyCyH41v45A")))
manager.close()
}
@Test
def testCreateStartAndClose(): Unit = {
val context = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(context.config, context.time,
"create-start-and-close-", Set(Uuid.fromString("uiUADXZWTPixVvp6UWFWnw")))
+ manager = new BrokerLifecycleManager(context.config, context.time,
"create-start-and-close-",
util.Set.of(Uuid.fromString("uiUADXZWTPixVvp6UWFWnw")))
assertEquals(BrokerState.NOT_RUNNING, manager.state)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId,
context.advertisedListeners,
@@ -81,7 +83,7 @@ class BrokerLifecycleManagerTest {
@Test
def testSuccessfulRegistration(): Unit = {
val context = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(context.config, context.time,
"successful-registration-", Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+ manager = new BrokerLifecycleManager(context.config, context.time,
"successful-registration-",
util.Set.of(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
manager.start(() => context.highestMetadataOffset.get(),
@@ -103,7 +105,7 @@ class BrokerLifecycleManagerTest {
def testRegistrationTimeout(): Unit = {
val context = new RegistrationTestContext(configProperties)
val controllerNode = new Node(3000, "localhost", 8021)
- manager = new BrokerLifecycleManager(context.config, context.time,
"registration-timeout-", Set(Uuid.fromString("9XBOAtr4T0Wbx2sbiWh6xg")))
+ manager = new BrokerLifecycleManager(context.config, context.time,
"registration-timeout-", util.Set.of(Uuid.fromString("9XBOAtr4T0Wbx2sbiWh6xg")))
context.controllerNodeProvider.node.set(controllerNode)
def newDuplicateRegistrationResponse(): Unit = {
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@@ -143,7 +145,7 @@ class BrokerLifecycleManagerTest {
@Test
def testControlledShutdown(): Unit = {
val context = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(context.config, context.time,
"controlled-shutdown-", Set(Uuid.fromString("B4RtUz1ySGip3A7ZFYB2dg")))
+ manager = new BrokerLifecycleManager(context.config, context.time,
"controlled-shutdown-", util.Set.of(Uuid.fromString("B4RtUz1ySGip3A7ZFYB2dg")))
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@@ -224,7 +226,7 @@ class BrokerLifecycleManagerTest {
@Test
def testAlwaysSendsAccumulatedOfflineDirs(): Unit = {
val ctx = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"offline-dirs-sent-in-heartbeat-",
Set(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")))
+ manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"offline-dirs-sent-in-heartbeat-",
util.Set.of(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")))
val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)
@@ -248,7 +250,7 @@ class BrokerLifecycleManagerTest {
@Test
def testRegistrationIncludesDirs(): Unit = {
- val logDirs = Set("ad5FLIeCTnaQdai5vOjeng",
"ybdzUKmYSLK6oiIpI6CPlw").map(Uuid.fromString)
+ val logDirs = util.Set.of(Uuid.fromString("ad5FLIeCTnaQdai5vOjeng"),
Uuid.fromString("ybdzUKmYSLK6oiIpI6CPlw"))
val ctx = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"registration-includes-dirs-", logDirs)
val controllerNode = new Node(3000, "localhost", 8021)
@@ -261,13 +263,13 @@ class BrokerLifecycleManagerTest {
Collections.emptyMap(), OptionalLong.empty())
val request = poll(ctx, manager,
registration).asInstanceOf[BrokerRegistrationRequest]
- assertEquals(logDirs, request.data.logDirs().asScala.toSet)
+ assertEquals(logDirs, new util.HashSet(request.data.logDirs()))
}
@Test
def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
val ctx = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"jbod-metadata-version-update", Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+ manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"jbod-metadata-version-update",
util.Set.of(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)
diff --git
a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
index 1d3048cec6a..96283d60389 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
@@ -62,7 +62,7 @@ class DescribeClusterRequestTest extends BaseRequestTest {
.setBrokerId(server.config.brokerId)
.setHost("localhost")
.setPort(server.socketServer.boundPort(listenerName))
- .setRack(server.config.rack.orNull)
+ .setRack(server.config.rack.orElse(null))
}.toSet
val expectedClusterId = brokers.last.clusterId
diff --git
a/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
new file mode 100644
index 00000000000..e148fb1c0b1
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerHeartbeatResponseData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import
org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.BrokerHeartbeatRequest;
+import org.apache.kafka.common.requests.BrokerHeartbeatResponse;
+import org.apache.kafka.common.requests.BrokerRegistrationRequest;
+import org.apache.kafka.common.requests.BrokerRegistrationResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * The broker lifecycle manager owns the broker state.
+ *
+ * Its inputs are messages passed in from other parts of the broker and from
the
+ * controller: requests to start up, or shut down, for example. Its output are
the broker
+ * state and various futures that can be used to wait for broker state
transitions to
+ * occur.
+ *
+ * The lifecycle manager handles registering the broker with the controller,
as described
+ * in KIP-631. After registration is complete, it handles sending periodic
broker
+ * heartbeats and processing the responses.
+ *
+ * This code uses an event queue paradigm. Modifications get translated into
events, which
+ * are placed on the queue to be processed sequentially. As described in the
JavaDoc for
+ * each variable, most mutable state can be accessed only from that event
queue thread.
+ * In some cases we expose a volatile variable which can be read from any
thread, but only
+ * written from the event queue thread.
+ */
+public class BrokerLifecycleManager {
+
+ private final Logger logger;
+ private final KafkaEventQueue eventQueue;
+ private final AbstractKafkaConfig config;
+ private final Time time;
+ private final Set<Uuid> logDirs;
+ private final Runnable shutdownHook;
+
+ /**
+ * The broker id.
+ */
+ private final int nodeId;
+
+ /**
+ * The broker rack, or null if there is no configured rack.
+ */
+ private final Optional<String> rack;
+
+ /**
+ * How long to wait for registration to succeed before failing the startup
process.
+ */
+ private final long initialTimeoutNs;
+
+ /**
+ * The broker incarnation ID. This ID uniquely identifies each time we
start the broker
+ */
+ private final Uuid incarnationId = Uuid.randomUuid();
+
+ /**
+ * A future which is completed just as soon as the broker has caught up
with the latest
+ * metadata offset for the first time.
+ */
+ private final CompletableFuture<Void> initialCatchUpFuture = new
CompletableFuture<>();
+
+ /**
+ * A future which is completed when the broker is unfenced for the first
time.
+ */
+ private final CompletableFuture<Void> initialUnfenceFuture = new
CompletableFuture<>();
+
+ /**
+ * A future which is completed when controlled shutdown is done.
+ */
+ private final CompletableFuture<Void> controlledShutdownFuture = new
CompletableFuture<>();
+
+ /**
+ * The broker epoch, or -1 if the broker has not yet registered. This
variable can only
+ * be written from the event queue thread.
+ */
+ private volatile long brokerEpoch = -1L;
+
+ /**
+ * The current broker state. This variable can only be written from the
event queue
+ * thread.
+ */
+ private volatile BrokerState state = BrokerState.NOT_RUNNING;
+
+ /**
+ * A thread-safe callback function which gives this manager the current
highest metadata
+ * offset. This variable can only be read or written from the event queue
thread.
+ */
+ private Supplier<Long> highestMetadataOffsetProvider;
+
+ /**
+ * True only if we are ready to unfence the broker. This variable can
only be read or
+ * written from the event queue thread.
+ */
+ private boolean readyToUnfence = false;
+
+ /**
+ * Map of accumulated offline directories. The value is true if the
directory has been successfully communicated
+ * to the Controller.
+ * This variable can only be read or written from the event queue thread.
+ */
+ private Map<Uuid, Boolean> offlineDirs = new HashMap<>();
+
+ /**
+ * True if we sent an event queue to the active controller requesting
controlled
+ * shutdown. This variable can only be read or written from the event
queue thread.
+ */
+ private boolean gotControlledShutdownResponse = false;
+
+ /**
+ * Whether this broker is registered with the controller quorum.
+ * This variable can only be read or written from the event queue thread.
+ */
+ private boolean registered = false;
+
+ /**
+ * True if a request has been sent and a response or timeout has not yet
been processed.
+ * This variable can only be read or written from the event queue thread.
+ */
+ private boolean communicationInFlight = false;
+
+ /**
+ * True if we should schedule the next communication immediately. This is
used to delay
+ * an immediate scheduling of a communication event if one is already in
flight.
+ * This variable can only be read or written from the event queue thread.
+ */
+ private boolean nextSchedulingShouldBeImmediate = false;
+
+ /**
+ * True if the initial registration succeeded. This variable can only be
read or
+ * written from the event queue thread.
+ */
+ private boolean initialRegistrationSucceeded = false;
+
+ /**
+ * The cluster ID, or null if this manager has not been started yet. This
variable can
+ * only be read or written from the event queue thread.
+ */
+ private String clusterId;
+
+ /**
+ * The listeners which this broker advertises. This variable can only be
read or
+ * written from the event queue thread.
+ */
+ private ListenerCollection advertisedListeners;
+
+ /**
+ * The features supported by this broker. This variable can only be read
or written
+ * from the event queue thread.
+ */
+ private Map<String, VersionRange> supportedFeatures;
+
+ /**
+ * The channel manager, or null if this manager has not been started yet.
This variable
+ * can only be read or written from the event queue thread.
+ */
+ private NodeToControllerChannelManager channelManager;
+
+ /**
+ * The broker epoch from the previous run, or empty if the epoch is not
found.
+ */
+ private volatile OptionalLong previousBrokerEpoch = OptionalLong.empty();
+
+ public BrokerLifecycleManager(
+ AbstractKafkaConfig config,
+ Time time,
+ String threadNamePrefix,
+ Set<Uuid> logDirs) {
+ this(config, time, threadNamePrefix, logDirs, () -> { });
+ }
+
+ public BrokerLifecycleManager(
+ AbstractKafkaConfig config,
+ Time time,
+ String threadNamePrefix,
+ Set<Uuid> logDirs,
+ Runnable shutdownHook) {
+ this.config = config;
+ this.time = time;
+ this.logDirs = logDirs;
+ this.shutdownHook = shutdownHook;
+ LogContext logContext = new LogContext("[BrokerLifecycleManager id=" +
this.config.nodeId() + "] ");
+ this.logger = logContext.logger(BrokerLifecycleManager.class);
+ this.nodeId = config.nodeId();
+ this.rack = config.rack();
+ this.initialTimeoutNs =
MILLISECONDS.toNanos(config.initialRegistrationTimeoutMs());
+ this.eventQueue = new KafkaEventQueue(
+ time,
+ logContext,
+ threadNamePrefix + "lifecycle-manager-",
+ new ShutdownEvent());
+ }
+
+ /**
+ * Start the BrokerLifecycleManager.
+ *
+ * @param highestMetadataOffsetProvider Provides the current highest
metadata offset.
+ * @param channelManager The NodeToControllerChannelManager
to use.
+ * @param clusterId The cluster ID.
+ * @param advertisedListeners The advertised listeners for this
broker.
+ * @param supportedFeatures The features for this broker.
+ * @param previousBrokerEpoch The broker epoch before the reboot.
+ */
+ public void start(Supplier<Long> highestMetadataOffsetProvider,
+ NodeToControllerChannelManager channelManager,
+ String clusterId,
+ ListenerCollection advertisedListeners,
+ Map<String, VersionRange> supportedFeatures,
+ OptionalLong previousBrokerEpoch) {
+ this.previousBrokerEpoch = previousBrokerEpoch;
+ eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
+ channelManager, clusterId, advertisedListeners,
supportedFeatures));
+ }
+
+ public CompletableFuture<Void> setReadyToUnfence() {
+ eventQueue.append(new SetReadyToUnfenceEvent());
+ return initialUnfenceFuture;
+ }
+
+ /**
+ * Propagate directory failures to the controller.
+ *
+ * @param directory The ID for the directory that failed.
+ */
+ public void propagateDirectoryFailure(Uuid directory, long timeout) {
+ eventQueue.append(new OfflineDirEvent(directory));
+ // If we can't communicate the offline directory to the controller, we
should shut down.
+ eventQueue.scheduleDeferred("offlineDirFailure",
+ new EventQueue.DeadlineFunction(time.nanoseconds() +
MILLISECONDS.toNanos(timeout)),
+ new OfflineDirBrokerFailureEvent(directory));
+ }
+
+ public void resendBrokerRegistration() {
+ eventQueue.append(new ResendBrokerRegistrationEvent());
+ }
+
+ private class ResendBrokerRegistrationEvent implements EventQueue.Event {
+ @Override
+ public void run() {
+ registered = false;
+ scheduleNextCommunicationImmediately();
+ }
+ }
+
+ public long brokerEpoch() {
+ return brokerEpoch;
+ }
+
+ public BrokerState state() {
+ return state;
+ }
+
+ public CompletableFuture<Void> initialCatchUpFuture() {
+ return initialCatchUpFuture;
+ }
+
+ public CompletableFuture<Void> initialUnfenceFuture() {
+ return initialUnfenceFuture;
+ }
+
+ public CompletableFuture<Void> controlledShutdownFuture() {
+ return controlledShutdownFuture;
+ }
+
+ public KafkaEventQueue eventQueue() {
+ return eventQueue;
+ }
+
+ private class BeginControlledShutdownEvent implements EventQueue.Event {
+ @Override
+ public void run() {
+ switch (state) {
+ case PENDING_CONTROLLED_SHUTDOWN ->
+ logger.info("Attempted to enter pending controlled
shutdown state, but we are already in that state.");
+ case RUNNING -> {
+ logger.info("Beginning controlled shutdown.");
+ state = BrokerState.PENDING_CONTROLLED_SHUTDOWN;
+ // Send the next heartbeat immediately in order to let the
controller
+ // begin processing the controlled shutdown as soon as
possible.
+ scheduleNextCommunicationImmediately();
+ }
+ default -> {
+ logger.info("Skipping controlled shutdown because we are
in state {}.", state);
+ beginShutdown();
+ }
+ }
+ }
+ }
+
+ /**
+ * Enter the controlled shutdown state if we are in RUNNING state.
+ * Or, if we're not running, shut down immediately.
+ */
+ public void beginControlledShutdown() {
+ eventQueue.append(new BeginControlledShutdownEvent());
+ }
+
+ /**
+ * Start shutting down the BrokerLifecycleManager, but do not block.
+ */
+ public void beginShutdown() {
+ eventQueue.beginShutdown("beginShutdown");
+ }
+
+ /**
+ * Shut down the BrokerLifecycleManager and block until all threads are
joined.
+ */
+ public void close() throws InterruptedException {
+ beginShutdown();
+ eventQueue.close();
+ }
+
+ private class SetReadyToUnfenceEvent implements EventQueue.Event {
+ @Override
+ public void run() {
+ readyToUnfence = true;
+ scheduleNextCommunicationImmediately();
+ }
+ }
+
+ private class OfflineDirEvent implements EventQueue.Event {
+
+ private final Uuid dir;
+
+ OfflineDirEvent(Uuid dir) {
+ this.dir = dir;
+ }
+
+ @Override
+ public void run() {
+ offlineDirs.put(dir, false);
+ if (registered) {
+ scheduleNextCommunicationImmediately();
+ }
+ }
+ }
+
+ private class OfflineDirBrokerFailureEvent implements EventQueue.Event {
+
+ private final Uuid offlineDir;
+
+ OfflineDirBrokerFailureEvent(Uuid offlineDir) {
+ this.offlineDir = offlineDir;
+ }
+
+ @Override
+ public void run() {
+ if (!offlineDirs.getOrDefault(offlineDir, false)) {
+ logger.error("Shutting down because couldn't communicate
offline log dir {} with controllers", offlineDir);
+ shutdownHook.run();
+ }
+ }
+ }
+
+ private class StartupEvent implements EventQueue.Event {
+
+ private final Supplier<Long> highestMetadataOffsetProvider;
+ private final NodeToControllerChannelManager channelManager;
+ private final String clusterId;
+ private final ListenerCollection advertisedListeners;
+ private final Map<String, VersionRange> supportedFeatures;
+
+ StartupEvent(Supplier<Long> highestMetadataOffsetProvider,
+ NodeToControllerChannelManager channelManager,
+ String clusterId,
+ ListenerCollection advertisedListeners,
+ Map<String, VersionRange> supportedFeatures) {
+ this.highestMetadataOffsetProvider = highestMetadataOffsetProvider;
+ this.channelManager = channelManager;
+ this.clusterId = clusterId;
+ this.advertisedListeners = advertisedListeners;
+ this.supportedFeatures = supportedFeatures;
+ }
+
+ @Override
+ public void run() {
+ BrokerLifecycleManager.this.highestMetadataOffsetProvider =
this.highestMetadataOffsetProvider;
+ BrokerLifecycleManager.this.channelManager = channelManager;
+ BrokerLifecycleManager.this.channelManager.start();
+ state = BrokerState.STARTING;
+ BrokerLifecycleManager.this.clusterId = clusterId;
+ BrokerLifecycleManager.this.advertisedListeners =
advertisedListeners.duplicate();
+ BrokerLifecycleManager.this.supportedFeatures =
Map.copyOf(supportedFeatures);
+ eventQueue.scheduleDeferred("initialRegistrationTimeout",
+ new EventQueue.DeadlineFunction(time.nanoseconds() +
initialTimeoutNs),
+ new RegistrationTimeoutEvent());
+ sendBrokerRegistration();
+ logger.info("Incarnation {} of broker {} in cluster {} is now
STARTING.", incarnationId, nodeId, clusterId);
+ }
+ }
+
+ private void sendBrokerRegistration() {
+ BrokerRegistrationRequestData.FeatureCollection features = new
BrokerRegistrationRequestData.FeatureCollection();
+ supportedFeatures.forEach((name, range) ->
+ features.add(new BrokerRegistrationRequestData.Feature()
+ .setName(name)
+ .setMinSupportedVersion(range.min())
+ .setMaxSupportedVersion(range.max()))
+ );
+ List<Uuid> sortedLogDirs = new ArrayList<>(logDirs);
+ sortedLogDirs.sort(Uuid::compareTo);
+ BrokerRegistrationRequestData data = new
BrokerRegistrationRequestData()
+ .setBrokerId(nodeId)
+ .setIsMigratingZkBroker(false)
+ .setClusterId(clusterId)
+ .setFeatures(features)
+ .setIncarnationId(incarnationId)
+ .setListeners(advertisedListeners)
+ .setRack(rack.orElse(null))
+ .setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L))
+ .setLogDirs(sortedLogDirs);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending broker registration {}", data);
+ }
+ channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data),
+ new BrokerRegistrationResponseHandler());
+ communicationInFlight = true;
+ }
+
+ // the response handler is not invoked from the event handler thread,
+ // so it is not safe to update state here, instead, schedule an event
+ // to continue handling the response on the event handler thread
+ private class BrokerRegistrationResponseHandler implements
ControllerRequestCompletionHandler {
+ @Override
+ public void onComplete(ClientResponse response) {
+ eventQueue.prepend(new BrokerRegistrationResponseEvent(response,
false));
+ }
+
+ @Override
+ public void onTimeout() {
+ logger.info("Unable to register the broker because the RPC got
timed out before it could be sent.");
+ eventQueue.prepend(new BrokerRegistrationResponseEvent(null,
true));
+ }
+ }
+
+ private class BrokerRegistrationResponseEvent implements EventQueue.Event {
+
+ private final ClientResponse response;
+ private final boolean timedOut;
+
+ BrokerRegistrationResponseEvent(ClientResponse response, boolean
timedOut) {
+ this.response = response;
+ this.timedOut = timedOut;
+ }
+
+ @Override
+ public void run() {
+ communicationInFlight = false;
+ if (timedOut) {
+ scheduleNextCommunicationAfterFailure();
+ return;
+ }
+ if (response.authenticationException() != null) {
+ logger.error("Unable to register broker $nodeId because of an
authentication exception.", response.authenticationException());
+ scheduleNextCommunicationAfterFailure();
+ } else if (response.versionMismatch() != null) {
+ logger.error("Unable to register broker $nodeId because of an
API version problem.", response.versionMismatch());
+ scheduleNextCommunicationAfterFailure();
+ } else if (response.responseBody() == null) {
+ logger.warn("Unable to register broker {}.", nodeId);
+ scheduleNextCommunicationAfterFailure();
+ } else if (!(response.responseBody() instanceof
BrokerRegistrationResponse message)) {
+ logger.error("Unable to register broker {} because the
controller returned an invalid response type.", nodeId);
+ scheduleNextCommunicationAfterFailure();
+ } else {
+ Errors errorCode = Errors.forCode(message.data().errorCode());
+ if (errorCode == Errors.NONE) {
+ brokerEpoch = message.data().brokerEpoch();
+ registered = true;
+ initialRegistrationSucceeded = true;
+ logger.info("Successfully registered broker {} with broker
epoch {}", nodeId, brokerEpoch);
+ scheduleNextCommunicationImmediately(); // Immediately
send a heartbeat
+ } else {
+ logger.info("Unable to register broker {} because the
controller returned error {}", nodeId, errorCode);
+ scheduleNextCommunicationAfterFailure();
+ }
+ }
+ }
+ }
+
+ private void sendBrokerHeartbeat() {
+ Long metadataOffset = highestMetadataOffsetProvider.get();
+ BrokerHeartbeatRequestData data = new BrokerHeartbeatRequestData()
+ .setBrokerEpoch(brokerEpoch)
+ .setBrokerId(nodeId)
+ .setCurrentMetadataOffset(metadataOffset)
+ .setWantFence(!readyToUnfence)
+ .setWantShutDown(state == BrokerState.PENDING_CONTROLLED_SHUTDOWN)
+ .setOfflineLogDirs(new ArrayList<>(offlineDirs.keySet()));
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending broker heartbeat {}", data);
+ }
+ BrokerHeartbeatResponseHandler handler = new
BrokerHeartbeatResponseHandler(offlineDirs.keySet());
+ channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data),
handler);
+ communicationInFlight = true;
+ }
+
+ // the response handler is not invoked from the event handler thread,
+ // so it is not safe to update state here, instead, schedule an event
+ // to continue handling the response on the event handler thread
+ private class BrokerHeartbeatResponseHandler implements
ControllerRequestCompletionHandler {
+
+ private final Iterable<Uuid> currentOfflineDirs;
+
+ BrokerHeartbeatResponseHandler(Iterable<Uuid> currentOfflineDirs) {
+ this.currentOfflineDirs = currentOfflineDirs;
+ }
+
+ @Override
+ public void onComplete(ClientResponse response) {
+ eventQueue.prepend(new BrokerHeartbeatResponseEvent(response,
false, currentOfflineDirs));
+ }
+
+ @Override
+ public void onTimeout() {
+ logger.info("Unable to send a heartbeat because the RPC got timed
out before it could be sent.");
+ eventQueue.prepend(new BrokerHeartbeatResponseEvent(null, true,
currentOfflineDirs));
+ }
+ }
+
+ private class BrokerHeartbeatResponseEvent implements EventQueue.Event {
+
+ private final ClientResponse response;
+ private final boolean timedOut;
+ private final Iterable<Uuid> currentOfflineDirs;
+
+ BrokerHeartbeatResponseEvent(ClientResponse response, boolean
timedOut, Iterable<Uuid> currentOfflineDirs) {
+ this.response = response;
+ this.timedOut = timedOut;
+ this.currentOfflineDirs = currentOfflineDirs;
+ }
+
+ @Override
+ public void run() {
+ communicationInFlight = false;
+ if (timedOut) {
+ scheduleNextCommunicationAfterFailure();
+ return;
+ }
+ if (response.authenticationException() != null) {
+ logger.error("Unable to send broker heartbeat for {} because
of an authentication exception.",
+ nodeId, response.authenticationException());
+ scheduleNextCommunicationAfterFailure();
+ } else if (response.versionMismatch() != null) {
+ logger.error("Unable to send broker heartbeat for {} because
of an API version problem.", nodeId, response.versionMismatch());
+ scheduleNextCommunicationAfterFailure();
+ } else if (response.responseBody() == null) {
+ logger.warn("Unable to send broker heartbeat for {}.
Retrying.", nodeId);
+ scheduleNextCommunicationAfterFailure();
+ } else if (!(response.responseBody() instanceof
BrokerHeartbeatResponse message)) {
+ logger.error("Unable to send broker heartbeat for {} because
the controller returned an invalid response type.", nodeId);
+ scheduleNextCommunicationAfterFailure();
+ } else {
+ Errors errorCode = Errors.forCode(message.data().errorCode());
+ if (errorCode == Errors.NONE) {
+ BrokerHeartbeatResponseData responseData = message.data();
+ currentOfflineDirs.forEach(cur -> offlineDirs.put(cur,
true));
+ switch (state) {
+ case STARTING -> {
+ if (responseData.isCaughtUp()) {
+ logger.info("The broker has caught up.
Transitioning from STARTING to RECOVERY.");
+ state = BrokerState.RECOVERY;
+ initialCatchUpFuture.complete(null);
+ } else {
+ logger.debug("The broker is STARTING. Still
waiting to catch up with cluster metadata.");
+ }
+ // Schedule the heartbeat after only 10 ms so that
in the case where
+ // there is no recovery work to be done, we start
up a bit quicker.
+ scheduleNextCommunication(NANOSECONDS.convert(10,
MILLISECONDS));
+ }
+ case RECOVERY -> {
+ if (!responseData.isFenced()) {
+ logger.info("The broker has been unfenced.
Transitioning from RECOVERY to RUNNING.");
+ initialUnfenceFuture.complete(null);
+ state = BrokerState.RUNNING;
+ } else {
+ logger.info("The broker is in RECOVERY.");
+ }
+ scheduleNextCommunicationAfterSuccess();
+ }
+ case RUNNING -> {
+ logger.debug("The broker is RUNNING. Processing
heartbeat response.");
+ scheduleNextCommunicationAfterSuccess();
+ }
+ case PENDING_CONTROLLED_SHUTDOWN -> {
+ if (!responseData.shouldShutDown()) {
+ logger.info("The broker is in
PENDING_CONTROLLED_SHUTDOWN state, still waiting " +
+ "for the active controller.");
+ if (!gotControlledShutdownResponse) {
+ // If this is the first pending controlled
shutdown response we got,
+ // schedule our next heartbeat a little
bit sooner than we usually would.
+ // In the case where controlled shutdown
completes quickly, this will
+ // speed things up a little bit.
+
scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS));
+ } else {
+ scheduleNextCommunicationAfterSuccess();
+ }
+ } else {
+ logger.info("The controller has asked us to
exit controlled shutdown.");
+ beginShutdown();
+ }
+ gotControlledShutdownResponse = true;
+ }
+ case SHUTTING_DOWN -> logger.info("The broker is
SHUTTING_DOWN. Ignoring heartbeat response.");
+ default -> {
+ logger.error("Unexpected broker state {}", state);
+ scheduleNextCommunicationAfterSuccess();
+ }
+ }
+ } else {
+ logger.warn("Broker {} sent a heartbeat request but
received error {}.", nodeId, errorCode);
+ scheduleNextCommunicationAfterFailure();
+ }
+ }
+ }
+ }
+
+ private void scheduleNextCommunicationImmediately() {
+ scheduleNextCommunication(0);
+ }
+
+ private void scheduleNextCommunicationAfterFailure() {
+ nextSchedulingShouldBeImmediate = false; // never immediately
reschedule after a failure
+
scheduleNextCommunication(NANOSECONDS.convert(config.brokerHeartbeatIntervalMs(),
MILLISECONDS));
+ }
+
+ private void scheduleNextCommunicationAfterSuccess() {
+
scheduleNextCommunication(NANOSECONDS.convert(config.brokerHeartbeatIntervalMs(),
MILLISECONDS));
+ }
+
+ private void scheduleNextCommunication(long intervalNs) {
+ long adjustedIntervalNs = nextSchedulingShouldBeImmediate ? 0 :
intervalNs;
+ nextSchedulingShouldBeImmediate = false;
+ logger.trace("Scheduling next communication at {}ms from now.",
MILLISECONDS.convert(adjustedIntervalNs, NANOSECONDS));
+ long deadlineNs = time.nanoseconds() + adjustedIntervalNs;
+ eventQueue.scheduleDeferred("communication", new
EventQueue.DeadlineFunction(deadlineNs), new CommunicationEvent());
+ }
+
+ private class RegistrationTimeoutEvent implements EventQueue.Event {
+ @Override
+ public void run() {
+ if (!initialRegistrationSucceeded) {
+ logger.error("Shutting down because we were unable to register
with the controller quorum.");
+ eventQueue.beginShutdown("registrationTimeout");
+ }
+ }
+ }
+
+ private class CommunicationEvent implements EventQueue.Event {
+ @Override
+ public void run() {
+ if (communicationInFlight) {
+ logger.trace("Delaying communication because there is already
one in flight.");
+ nextSchedulingShouldBeImmediate = true;
+ } else if (registered) {
+ sendBrokerHeartbeat();
+ } else {
+ sendBrokerRegistration();
+ }
+ }
+ }
+
+ private class ShutdownEvent implements EventQueue.Event {
+ @Override
+ public void run() throws InterruptedException {
+ logger.info("Transitioning from {} to {}.", state,
BrokerState.SHUTTING_DOWN);
+ state = BrokerState.SHUTTING_DOWN;
+ controlledShutdownFuture.complete(null);
+ initialCatchUpFuture.cancel(false);
+ initialUnfenceFuture.cancel(false);
+ if (channelManager != null) {
+ channelManager.shutdown();
+ channelManager = null;
+ }
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index f0d25f3ca42..e649d6e6035 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -120,6 +120,22 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
return interBrokerListenerNameAndSecurityProtocol().getValue();
}
+ public int initialRegistrationTimeoutMs() {
+ return
getInt(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG);
+ }
+
+ public int brokerHeartbeatIntervalMs() {
+ return getInt(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG);
+ }
+
+ public Optional<String> rack() {
+ return
Optional.ofNullable(getString(ServerConfigs.BROKER_RACK_CONFIG));
+ }
+
+ public int nodeId() {
+ return getInt(KRaftConfigs.NODE_ID_CONFIG);
+ }
+
public Map<ListenerName, SecurityProtocol>
effectiveListenerSecurityProtocolMap() {
Map<ListenerName, SecurityProtocol> mapValue =
getMap(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,