This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new eaa6b8abdd5 KAFKA-15360: Include dirs in BrokerRegistration #14392 eaa6b8abdd5 is described below commit eaa6b8abdd543fd1fc7152fbdb76643aad6223d6 Author: Igor Soarez <i...@soarez.me> AuthorDate: Mon Sep 11 22:41:06 2023 +0100 KAFKA-15360: Include dirs in BrokerRegistration #14392 BrokerLifecycleManager should send the offline log directories in the BrokerHeartbeatRequests it sends. Also, when handling BrokerHeartbeatResponses, do so by enqueing a BrokerLifecycleManager event, rather than trying to do the handling directly in the callback. Reviewers: Colin P. McCabe <cmcc...@apache.org>, Proven Provenzano <pprovenz...@confluent.io> --- .../kafka/server/BrokerLifecycleManager.scala | 183 +++++++++++++-------- .../src/main/scala/kafka/server/BrokerServer.scala | 11 +- .../kafka/server/BrokerLifecycleManagerTest.scala | 80 ++++++++- 3 files changed, 202 insertions(+), 72 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala index 36d0ee43873..a26459e56d8 100644 --- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala +++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala @@ -23,7 +23,7 @@ import kafka.utils.Logging import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.Uuid import org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection -import org.apache.kafka.common.message.{BrokerHeartbeatRequestData, BrokerRegistrationRequestData} +import org.apache.kafka.common.message.{BrokerHeartbeatRequestData, BrokerHeartbeatResponseData, BrokerRegistrationRequestData, BrokerRegistrationResponseData} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse} import org.apache.kafka.metadata.{BrokerState, VersionRange} @@ -31,10 +31,9 @@ import org.apache.kafka.queue.EventQueue.DeadlineFunction import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time} import org.apache.kafka.queue.{EventQueue, KafkaEventQueue} -import java.util.OptionalLong +import java.util.{Comparator, OptionalLong} import scala.jdk.CollectionConverters._ - /** * The broker lifecycle manager owns the broker state. * @@ -57,7 +56,8 @@ class BrokerLifecycleManager( val config: KafkaConfig, val time: Time, val threadNamePrefix: String, - val isZkBroker: Boolean + val isZkBroker: Boolean, + val logDirs: Set[Uuid] = Set.empty[Uuid] ) extends Logging { private def logPrefix(): String = { @@ -98,7 +98,7 @@ class BrokerLifecycleManager( /** * The number of times we've tried and failed to communicate. This variable can only be - * read or written from the event queue thread. + * read or written from the BrokerToControllerRequestThread. */ private var failedAttempts = 0L @@ -147,6 +147,12 @@ class BrokerLifecycleManager( */ private var readyToUnfence = false + /** + * List of offline directories pending to be sent. + * This variable can only be read or written from the event queue thread. + */ + private var offlineDirsPending = Set[Uuid]() + /** * True if we sent a event queue to the active controller requesting controlled * shutdown. This variable can only be read or written from the event queue thread. @@ -229,6 +235,14 @@ class BrokerLifecycleManager( initialUnfenceFuture } + /** + * Propagate directory failures to the controller. + * @param directory The ID for the directory that failed. + */ + def propagateDirectoryFailure(directory: Uuid): Unit = { + eventQueue.append(new OfflineDirEvent(directory)) + } + def brokerEpoch: Long = _brokerEpoch def state: BrokerState = _state @@ -283,6 +297,19 @@ class BrokerLifecycleManager( } } + private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event { + override def run(): Unit = { + if (offlineDirsPending.isEmpty) { + offlineDirsPending = Set(dir) + } else { + offlineDirsPending = offlineDirsPending.incl(dir) + } + if (registered) { + scheduleNextCommunicationImmediately() + } + } + } + private class StartupEvent(highestMetadataOffsetProvider: () => Long, channelManager: NodeToControllerChannelManager, clusterId: String, @@ -316,6 +343,11 @@ class BrokerLifecycleManager( setMinSupportedVersion(range.min()). setMaxSupportedVersion(range.max())) } + val sortedLogDirs = new util.ArrayList[Uuid] + logDirs.foreach(sortedLogDirs.add(_)) + sortedLogDirs.sort(new Comparator[Uuid]() { + override def compare(a: Uuid, b: Uuid): Int = a.compareTo(b) + }) val data = new BrokerRegistrationRequestData(). setBrokerId(nodeId). setIsMigratingZkBroker(isZkBroker). @@ -324,7 +356,8 @@ class BrokerLifecycleManager( setIncarnationId(incarnationId). setListeners(_advertisedListeners). setRack(rack.orNull). - setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L)) + setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L)). + setLogDirs(sortedLogDirs) if (isDebugEnabled) { debug(s"Sending broker registration $data") } @@ -353,12 +386,10 @@ class BrokerLifecycleManager( val message = response.responseBody().asInstanceOf[BrokerRegistrationResponse] val errorCode = Errors.forCode(message.data().errorCode()) if (errorCode == Errors.NONE) { - failedAttempts = 0 - _brokerEpoch = message.data().brokerEpoch() - registered = true - initialRegistrationSucceeded = true - info(s"Successfully registered broker $nodeId with broker epoch ${_brokerEpoch}") - scheduleNextCommunicationImmediately() // Immediately send a heartbeat + // this response handler is not invoked from the event handler thread, + // and processing a successful registration response requires updating + // state, so to continue we need to schedule an event + eventQueue.prepend(new BrokerRegistrationResponseEvent(message.data())) } else { info(s"Unable to register broker $nodeId because the controller returned " + s"error $errorCode") @@ -373,6 +404,17 @@ class BrokerLifecycleManager( } } + private class BrokerRegistrationResponseEvent(response: BrokerRegistrationResponseData) extends EventQueue.Event { + override def run(): Unit = { + failedAttempts = 0 + _brokerEpoch = response.brokerEpoch() + registered = true + initialRegistrationSucceeded = true + info(s"Successfully registered broker $nodeId with broker epoch ${_brokerEpoch}") + scheduleNextCommunicationImmediately() // Immediately send a heartbeat + } + } + private def sendBrokerHeartbeat(): Unit = { val metadataOffset = _highestMetadataOffsetProvider() val data = new BrokerHeartbeatRequestData(). @@ -380,15 +422,16 @@ class BrokerLifecycleManager( setBrokerId(nodeId). setCurrentMetadataOffset(metadataOffset). setWantFence(!readyToUnfence). - setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN) + setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN). + setOfflineLogDirs(offlineDirsPending.toSeq.asJava) if (isTraceEnabled) { trace(s"Sending broker heartbeat $data") } - _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), - new BrokerHeartbeatResponseHandler()) + val handler = new BrokerHeartbeatResponseHandler(offlineDirsPending) + _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), handler) } - private class BrokerHeartbeatResponseHandler extends ControllerRequestCompletionHandler { + private class BrokerHeartbeatResponseHandler(dirsInFlight: Set[Uuid]) extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { if (response.authenticationException() != null) { error(s"Unable to send broker heartbeat for $nodeId because of an " + @@ -409,55 +452,10 @@ class BrokerLifecycleManager( val message = response.responseBody().asInstanceOf[BrokerHeartbeatResponse] val errorCode = Errors.forCode(message.data().errorCode()) if (errorCode == Errors.NONE) { - failedAttempts = 0 - _state match { - case BrokerState.STARTING => - if (message.data().isCaughtUp) { - info(s"The broker has caught up. Transitioning from STARTING to RECOVERY.") - _state = BrokerState.RECOVERY - initialCatchUpFuture.complete(null) - } else { - debug(s"The broker is STARTING. Still waiting to catch up with cluster metadata.") - } - // Schedule the heartbeat after only 10 ms so that in the case where - // there is no recovery work to be done, we start up a bit quicker. - scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS)) - case BrokerState.RECOVERY => - if (!message.data().isFenced) { - info(s"The broker has been unfenced. Transitioning from RECOVERY to RUNNING.") - initialUnfenceFuture.complete(null) - _state = BrokerState.RUNNING - } else { - info(s"The broker is in RECOVERY.") - } - scheduleNextCommunicationAfterSuccess() - case BrokerState.RUNNING => - debug(s"The broker is RUNNING. Processing heartbeat response.") - scheduleNextCommunicationAfterSuccess() - case BrokerState.PENDING_CONTROLLED_SHUTDOWN => - if (!message.data().shouldShutDown()) { - info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, still waiting " + - "for the active controller.") - if (!gotControlledShutdownResponse) { - // If this is the first pending controlled shutdown response we got, - // schedule our next heartbeat a little bit sooner than we usually would. - // In the case where controlled shutdown completes quickly, this will - // speed things up a little bit. - scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS)) - } else { - scheduleNextCommunicationAfterSuccess() - } - } else { - info(s"The controller has asked us to exit controlled shutdown.") - beginShutdown() - } - gotControlledShutdownResponse = true - case BrokerState.SHUTTING_DOWN => - info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat response.") - case _ => - error(s"Unexpected broker state ${_state}") - scheduleNextCommunicationAfterSuccess() - } + // this response handler is not invoked from the event handler thread, + // and processing a successful heartbeat response requires updating + // state, so to continue we need to schedule an event + eventQueue.prepend(new BrokerHeartbeatResponseEvent(message.data(), dirsInFlight)) } else { warn(s"Broker $nodeId sent a heartbeat request but received error $errorCode.") scheduleNextCommunicationAfterFailure() @@ -471,6 +469,61 @@ class BrokerLifecycleManager( } } + private class BrokerHeartbeatResponseEvent(response: BrokerHeartbeatResponseData, dirsInFlight: Set[Uuid]) extends EventQueue.Event { + override def run(): Unit = { + failedAttempts = 0 + offlineDirsPending = offlineDirsPending.diff(dirsInFlight) + _state match { + case BrokerState.STARTING => + if (response.isCaughtUp) { + info(s"The broker has caught up. Transitioning from STARTING to RECOVERY.") + _state = BrokerState.RECOVERY + initialCatchUpFuture.complete(null) + } else { + debug(s"The broker is STARTING. Still waiting to catch up with cluster metadata.") + } + // Schedule the heartbeat after only 10 ms so that in the case where + // there is no recovery work to be done, we start up a bit quicker. + scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS)) + case BrokerState.RECOVERY => + if (!response.isFenced) { + info(s"The broker has been unfenced. Transitioning from RECOVERY to RUNNING.") + initialUnfenceFuture.complete(null) + _state = BrokerState.RUNNING + } else { + info(s"The broker is in RECOVERY.") + } + scheduleNextCommunicationAfterSuccess() + case BrokerState.RUNNING => + debug(s"The broker is RUNNING. Processing heartbeat response.") + scheduleNextCommunicationAfterSuccess() + case BrokerState.PENDING_CONTROLLED_SHUTDOWN => + if (!response.shouldShutDown()) { + info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, still waiting " + + "for the active controller.") + if (!gotControlledShutdownResponse) { + // If this is the first pending controlled shutdown response we got, + // schedule our next heartbeat a little bit sooner than we usually would. + // In the case where controlled shutdown completes quickly, this will + // speed things up a little bit. + scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS)) + } else { + scheduleNextCommunicationAfterSuccess() + } + } else { + info(s"The controller has asked us to exit controlled shutdown.") + beginShutdown() + } + gotControlledShutdownResponse = true + case BrokerState.SHUTTING_DOWN => + info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat response.") + case _ => + error(s"Unexpected broker state ${_state}") + scheduleNextCommunicationAfterSuccess() + } + } + } + private def scheduleNextCommunicationImmediately(): Unit = scheduleNextCommunication(0) private def scheduleNextCommunicationAfterFailure(): Unit = { diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 30c25044282..e34fe4c89c9 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -176,11 +176,6 @@ class BrokerServer( config.dynamicConfig.initialize(zkClientOpt = None) - lifecycleManager = new BrokerLifecycleManager(config, - time, - s"broker-${config.nodeId}-", - isZkBroker = false) - /* start scheduler */ kafkaScheduler = new KafkaScheduler(config.backgroundThreads) kafkaScheduler.startup() @@ -207,6 +202,12 @@ class BrokerServer( remoteLogManagerOpt = createRemoteLogManager() + lifecycleManager = new BrokerLifecycleManager(config, + time, + s"broker-${config.nodeId}-", + isZkBroker = false, + logDirs = logManager.directoryIds.values.toSet) + // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala index 77ae31c4c2e..0bc993d55df 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala @@ -20,13 +20,16 @@ package kafka.server import java.util.{Collections, OptionalLong, Properties} import kafka.utils.TestUtils import org.apache.kafka.common.Node +import org.apache.kafka.common.Uuid import org.apache.kafka.common.message.{BrokerHeartbeatResponseData, BrokerRegistrationResponseData} import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{AbstractRequest, BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse} +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse} import org.apache.kafka.metadata.BrokerState -import org.junit.jupiter.api.{Test, Timeout} import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.{Test, Timeout} +import java.util.concurrent.{CompletableFuture, Future} +import scala.jdk.CollectionConverters._ @Timeout(value = 12) class BrokerLifecycleManagerTest { @@ -38,6 +41,7 @@ class BrokerLifecycleManagerTest { properties.setProperty(KafkaConfig.QuorumVotersProp, s"2@localhost:9093") properties.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL") properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, "300000") + properties.setProperty(KafkaConfig.BrokerHeartbeatIntervalMsProp, "100") properties } @@ -182,4 +186,76 @@ class BrokerLifecycleManagerTest { manager.controlledShutdownFuture.get() manager.close() } + + def prepareResponse[T<:AbstractRequest](ctx: RegistrationTestContext, response: AbstractResponse): Future[T] = { + val result = new CompletableFuture[T]() + ctx.mockClient.prepareResponseFrom( + (body: AbstractRequest) => result.complete(body.asInstanceOf[T]), + response, + ctx.controllerNodeProvider.node.get + ) + result + } + + def poll[T](context: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { + while (!future.isDone || context.mockClient.hasInFlightRequests) { + context.poll() + manager.eventQueue.wakeup() + context.time.sleep(100) + } + future.get + } + + @Test + def testOfflineDirsSentUntilHeartbeatSuccess(): Unit = { + val ctx = new RegistrationTestContext(configProperties) + val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "offline-dirs-sent-in-heartbeat-", isZkBroker = false) + val controllerNode = new Node(3000, "localhost", 8021) + ctx.controllerNodeProvider.node.set(controllerNode) + + val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(1000))) + val hb1 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData() + .setErrorCode(Errors.NOT_CONTROLLER.code()))) + val hb2 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) + val hb3 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) + + val offlineDirs = Set(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"), Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) + offlineDirs.foreach(manager.propagateDirectoryFailure) + + // start the manager late to prevent a race, and force expectations on the first heartbeat + manager.start(() => ctx.highestMetadataOffset.get(), + ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, + Collections.emptyMap(), OptionalLong.empty()) + + poll(ctx, manager, registration) + val dirs1 = poll(ctx, manager, hb1).data().offlineLogDirs() + val dirs2 = poll(ctx, manager, hb2).data().offlineLogDirs() + val dirs3 = poll(ctx, manager, hb3).data().offlineLogDirs() + + assertEquals(offlineDirs, dirs1.asScala.toSet) + assertEquals(offlineDirs, dirs2.asScala.toSet) + assertEquals(Set.empty, dirs3.asScala.toSet) + manager.close() + } + + @Test + def testRegistrationIncludesDirs(): Unit = { + val logDirs = Set("ad5FLIeCTnaQdai5vOjeng", "ybdzUKmYSLK6oiIpI6CPlw").map(Uuid.fromString) + val ctx = new RegistrationTestContext(configProperties) + val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "registration-includes-dirs-", + isZkBroker = false, logDirs) + val controllerNode = new Node(3000, "localhost", 8021) + ctx.controllerNodeProvider.node.set(controllerNode) + + val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(1000))) + + manager.start(() => ctx.highestMetadataOffset.get(), + ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, + Collections.emptyMap(), OptionalLong.empty()) + val request = poll(ctx, manager, registration).asInstanceOf[BrokerRegistrationRequest] + + assertEquals(logDirs, request.data.logDirs().asScala.toSet) + + manager.close() + } }