This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eaa6b8abdd5 KAFKA-15360: Include dirs in BrokerRegistration #14392
eaa6b8abdd5 is described below

commit eaa6b8abdd543fd1fc7152fbdb76643aad6223d6
Author: Igor Soarez <i...@soarez.me>
AuthorDate: Mon Sep 11 22:41:06 2023 +0100

    KAFKA-15360: Include dirs in BrokerRegistration #14392
    
    BrokerLifecycleManager should send the offline log directories in the 
BrokerHeartbeatRequests it
    sends. Also, when handling BrokerHeartbeatResponses, do so by enqueing a 
BrokerLifecycleManager
    event, rather than trying to do the handling directly in the callback.
    
    Reviewers: Colin P. McCabe <cmcc...@apache.org>, Proven Provenzano 
<pprovenz...@confluent.io>
---
 .../kafka/server/BrokerLifecycleManager.scala      | 183 +++++++++++++--------
 .../src/main/scala/kafka/server/BrokerServer.scala |  11 +-
 .../kafka/server/BrokerLifecycleManagerTest.scala  |  80 ++++++++-
 3 files changed, 202 insertions(+), 72 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala 
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 36d0ee43873..a26459e56d8 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -23,7 +23,7 @@ import kafka.utils.Logging
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.Uuid
 import 
org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection
-import org.apache.kafka.common.message.{BrokerHeartbeatRequestData, 
BrokerRegistrationRequestData}
+import org.apache.kafka.common.message.{BrokerHeartbeatRequestData, 
BrokerHeartbeatResponseData, BrokerRegistrationRequestData, 
BrokerRegistrationResponseData}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{BrokerHeartbeatRequest, 
BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse}
 import org.apache.kafka.metadata.{BrokerState, VersionRange}
@@ -31,10 +31,9 @@ import org.apache.kafka.queue.EventQueue.DeadlineFunction
 import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
 import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
 
-import java.util.OptionalLong
+import java.util.{Comparator, OptionalLong}
 import scala.jdk.CollectionConverters._
 
-
 /**
  * The broker lifecycle manager owns the broker state.
  *
@@ -57,7 +56,8 @@ class BrokerLifecycleManager(
   val config: KafkaConfig,
   val time: Time,
   val threadNamePrefix: String,
-  val isZkBroker: Boolean
+  val isZkBroker: Boolean,
+  val logDirs: Set[Uuid] = Set.empty[Uuid]
 ) extends Logging {
 
   private def logPrefix(): String = {
@@ -98,7 +98,7 @@ class BrokerLifecycleManager(
 
   /**
    * The number of times we've tried and failed to communicate.  This variable 
can only be
-   * read or written from the event queue thread.
+   * read or written from the BrokerToControllerRequestThread.
    */
   private var failedAttempts = 0L
 
@@ -147,6 +147,12 @@ class BrokerLifecycleManager(
    */
   private var readyToUnfence = false
 
+  /**
+   * List of offline directories pending to be sent.
+   * This variable can only be read or written from the event queue thread.
+   */
+  private var offlineDirsPending = Set[Uuid]()
+
   /**
    * True if we sent a event queue to the active controller requesting 
controlled
    * shutdown.  This variable can only be read or written from the event queue 
thread.
@@ -229,6 +235,14 @@ class BrokerLifecycleManager(
     initialUnfenceFuture
   }
 
+  /**
+   * Propagate directory failures to the controller.
+   * @param directory The ID for the directory that failed.
+   */
+  def propagateDirectoryFailure(directory: Uuid): Unit = {
+    eventQueue.append(new OfflineDirEvent(directory))
+  }
+
   def brokerEpoch: Long = _brokerEpoch
 
   def state: BrokerState = _state
@@ -283,6 +297,19 @@ class BrokerLifecycleManager(
     }
   }
 
+  private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event {
+    override def run(): Unit = {
+      if (offlineDirsPending.isEmpty) {
+        offlineDirsPending = Set(dir)
+      } else {
+        offlineDirsPending = offlineDirsPending.incl(dir)
+      }
+      if (registered) {
+        scheduleNextCommunicationImmediately()
+      }
+    }
+  }
+
   private class StartupEvent(highestMetadataOffsetProvider: () => Long,
                      channelManager: NodeToControllerChannelManager,
                      clusterId: String,
@@ -316,6 +343,11 @@ class BrokerLifecycleManager(
         setMinSupportedVersion(range.min()).
         setMaxSupportedVersion(range.max()))
     }
+    val sortedLogDirs = new util.ArrayList[Uuid]
+    logDirs.foreach(sortedLogDirs.add(_))
+    sortedLogDirs.sort(new Comparator[Uuid]() {
+      override def compare(a: Uuid, b: Uuid): Int = a.compareTo(b)
+    })
     val data = new BrokerRegistrationRequestData().
         setBrokerId(nodeId).
         setIsMigratingZkBroker(isZkBroker).
@@ -324,7 +356,8 @@ class BrokerLifecycleManager(
         setIncarnationId(incarnationId).
         setListeners(_advertisedListeners).
         setRack(rack.orNull).
-        setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L))
+        setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L)).
+        setLogDirs(sortedLogDirs)
     if (isDebugEnabled) {
       debug(s"Sending broker registration $data")
     }
@@ -353,12 +386,10 @@ class BrokerLifecycleManager(
         val message = 
response.responseBody().asInstanceOf[BrokerRegistrationResponse]
         val errorCode = Errors.forCode(message.data().errorCode())
         if (errorCode == Errors.NONE) {
-          failedAttempts = 0
-          _brokerEpoch = message.data().brokerEpoch()
-          registered = true
-          initialRegistrationSucceeded = true
-          info(s"Successfully registered broker $nodeId with broker epoch 
${_brokerEpoch}")
-          scheduleNextCommunicationImmediately() // Immediately send a 
heartbeat
+          // this response handler is not invoked from the event handler 
thread,
+          // and processing a successful registration response requires 
updating
+          // state, so to continue we need to schedule an event
+          eventQueue.prepend(new 
BrokerRegistrationResponseEvent(message.data()))
         } else {
           info(s"Unable to register broker $nodeId because the controller 
returned " +
             s"error $errorCode")
@@ -373,6 +404,17 @@ class BrokerLifecycleManager(
     }
   }
 
+  private class BrokerRegistrationResponseEvent(response: 
BrokerRegistrationResponseData) extends EventQueue.Event {
+    override def run(): Unit = {
+      failedAttempts = 0
+      _brokerEpoch = response.brokerEpoch()
+      registered = true
+      initialRegistrationSucceeded = true
+      info(s"Successfully registered broker $nodeId with broker epoch 
${_brokerEpoch}")
+      scheduleNextCommunicationImmediately() // Immediately send a heartbeat
+    }
+  }
+
   private def sendBrokerHeartbeat(): Unit = {
     val metadataOffset = _highestMetadataOffsetProvider()
     val data = new BrokerHeartbeatRequestData().
@@ -380,15 +422,16 @@ class BrokerLifecycleManager(
       setBrokerId(nodeId).
       setCurrentMetadataOffset(metadataOffset).
       setWantFence(!readyToUnfence).
-      setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN)
+      setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN).
+      setOfflineLogDirs(offlineDirsPending.toSeq.asJava)
     if (isTraceEnabled) {
       trace(s"Sending broker heartbeat $data")
     }
-    _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data),
-      new BrokerHeartbeatResponseHandler())
+    val handler = new BrokerHeartbeatResponseHandler(offlineDirsPending)
+    _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
handler)
   }
 
-  private class BrokerHeartbeatResponseHandler extends 
ControllerRequestCompletionHandler {
+  private class BrokerHeartbeatResponseHandler(dirsInFlight: Set[Uuid]) 
extends ControllerRequestCompletionHandler {
     override def onComplete(response: ClientResponse): Unit = {
       if (response.authenticationException() != null) {
         error(s"Unable to send broker heartbeat for $nodeId because of an " +
@@ -409,55 +452,10 @@ class BrokerLifecycleManager(
         val message = 
response.responseBody().asInstanceOf[BrokerHeartbeatResponse]
         val errorCode = Errors.forCode(message.data().errorCode())
         if (errorCode == Errors.NONE) {
-          failedAttempts = 0
-          _state match {
-            case BrokerState.STARTING =>
-              if (message.data().isCaughtUp) {
-                info(s"The broker has caught up. Transitioning from STARTING 
to RECOVERY.")
-                _state = BrokerState.RECOVERY
-                initialCatchUpFuture.complete(null)
-              } else {
-                debug(s"The broker is STARTING. Still waiting to catch up with 
cluster metadata.")
-              }
-              // Schedule the heartbeat after only 10 ms so that in the case 
where
-              // there is no recovery work to be done, we start up a bit 
quicker.
-              scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
-            case BrokerState.RECOVERY =>
-              if (!message.data().isFenced) {
-                info(s"The broker has been unfenced. Transitioning from 
RECOVERY to RUNNING.")
-                initialUnfenceFuture.complete(null)
-                _state = BrokerState.RUNNING
-              } else {
-                info(s"The broker is in RECOVERY.")
-              }
-              scheduleNextCommunicationAfterSuccess()
-            case BrokerState.RUNNING =>
-              debug(s"The broker is RUNNING. Processing heartbeat response.")
-              scheduleNextCommunicationAfterSuccess()
-            case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
-              if (!message.data().shouldShutDown()) {
-                info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, 
still waiting " +
-                  "for the active controller.")
-                if (!gotControlledShutdownResponse) {
-                  // If this is the first pending controlled shutdown response 
we got,
-                  // schedule our next heartbeat a little bit sooner than we 
usually would.
-                  // In the case where controlled shutdown completes quickly, 
this will
-                  // speed things up a little bit.
-                  scheduleNextCommunication(NANOSECONDS.convert(50, 
MILLISECONDS))
-                } else {
-                  scheduleNextCommunicationAfterSuccess()
-                }
-              } else {
-                info(s"The controller has asked us to exit controlled 
shutdown.")
-                beginShutdown()
-              }
-              gotControlledShutdownResponse = true
-            case BrokerState.SHUTTING_DOWN =>
-              info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat 
response.")
-            case _ =>
-              error(s"Unexpected broker state ${_state}")
-              scheduleNextCommunicationAfterSuccess()
-          }
+          // this response handler is not invoked from the event handler 
thread,
+          // and processing a successful heartbeat response requires updating
+          // state, so to continue we need to schedule an event
+          eventQueue.prepend(new BrokerHeartbeatResponseEvent(message.data(), 
dirsInFlight))
         } else {
           warn(s"Broker $nodeId sent a heartbeat request but received error 
$errorCode.")
           scheduleNextCommunicationAfterFailure()
@@ -471,6 +469,61 @@ class BrokerLifecycleManager(
     }
   }
 
+  private class BrokerHeartbeatResponseEvent(response: 
BrokerHeartbeatResponseData, dirsInFlight: Set[Uuid]) extends EventQueue.Event {
+    override def run(): Unit = {
+      failedAttempts = 0
+      offlineDirsPending = offlineDirsPending.diff(dirsInFlight)
+      _state match {
+        case BrokerState.STARTING =>
+          if (response.isCaughtUp) {
+            info(s"The broker has caught up. Transitioning from STARTING to 
RECOVERY.")
+            _state = BrokerState.RECOVERY
+            initialCatchUpFuture.complete(null)
+          } else {
+            debug(s"The broker is STARTING. Still waiting to catch up with 
cluster metadata.")
+          }
+          // Schedule the heartbeat after only 10 ms so that in the case where
+          // there is no recovery work to be done, we start up a bit quicker.
+          scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
+        case BrokerState.RECOVERY =>
+          if (!response.isFenced) {
+            info(s"The broker has been unfenced. Transitioning from RECOVERY 
to RUNNING.")
+            initialUnfenceFuture.complete(null)
+            _state = BrokerState.RUNNING
+          } else {
+            info(s"The broker is in RECOVERY.")
+          }
+          scheduleNextCommunicationAfterSuccess()
+        case BrokerState.RUNNING =>
+          debug(s"The broker is RUNNING. Processing heartbeat response.")
+          scheduleNextCommunicationAfterSuccess()
+        case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
+          if (!response.shouldShutDown()) {
+            info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, still 
waiting " +
+              "for the active controller.")
+            if (!gotControlledShutdownResponse) {
+              // If this is the first pending controlled shutdown response we 
got,
+              // schedule our next heartbeat a little bit sooner than we 
usually would.
+              // In the case where controlled shutdown completes quickly, this 
will
+              // speed things up a little bit.
+              scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS))
+            } else {
+              scheduleNextCommunicationAfterSuccess()
+            }
+          } else {
+            info(s"The controller has asked us to exit controlled shutdown.")
+            beginShutdown()
+          }
+          gotControlledShutdownResponse = true
+        case BrokerState.SHUTTING_DOWN =>
+          info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat response.")
+        case _ =>
+          error(s"Unexpected broker state ${_state}")
+          scheduleNextCommunicationAfterSuccess()
+      }
+    }
+  }
+
   private def scheduleNextCommunicationImmediately(): Unit = 
scheduleNextCommunication(0)
 
   private def scheduleNextCommunicationAfterFailure(): Unit = {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 30c25044282..e34fe4c89c9 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -176,11 +176,6 @@ class BrokerServer(
 
       config.dynamicConfig.initialize(zkClientOpt = None)
 
-      lifecycleManager = new BrokerLifecycleManager(config,
-        time,
-        s"broker-${config.nodeId}-",
-        isZkBroker = false)
-
       /* start scheduler */
       kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
       kafkaScheduler.startup()
@@ -207,6 +202,12 @@ class BrokerServer(
 
       remoteLogManagerOpt = createRemoteLogManager()
 
+      lifecycleManager = new BrokerLifecycleManager(config,
+        time,
+        s"broker-${config.nodeId}-",
+        isZkBroker = false,
+        logDirs = logManager.directoryIds.values.toSet)
+
       // Enable delegation token cache for all SCRAM mechanisms to simplify 
dynamic update.
       // This keeps the cache up-to-date if new SCRAM mechanisms are enabled 
dynamically.
       tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 77ae31c4c2e..0bc993d55df 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -20,13 +20,16 @@ package kafka.server
 import java.util.{Collections, OptionalLong, Properties}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.Node
+import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.message.{BrokerHeartbeatResponseData, 
BrokerRegistrationResponseData}
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{AbstractRequest, 
BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, 
BrokerRegistrationResponse}
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, 
BrokerRegistrationResponse}
 import org.apache.kafka.metadata.BrokerState
-import org.junit.jupiter.api.{Test, Timeout}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{Test, Timeout}
 
+import java.util.concurrent.{CompletableFuture, Future}
+import scala.jdk.CollectionConverters._
 
 @Timeout(value = 12)
 class BrokerLifecycleManagerTest {
@@ -38,6 +41,7 @@ class BrokerLifecycleManagerTest {
     properties.setProperty(KafkaConfig.QuorumVotersProp, s"2@localhost:9093")
     properties.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
     properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, 
"300000")
+    properties.setProperty(KafkaConfig.BrokerHeartbeatIntervalMsProp, "100")
     properties
   }
 
@@ -182,4 +186,76 @@ class BrokerLifecycleManagerTest {
     manager.controlledShutdownFuture.get()
     manager.close()
   }
+
+  def prepareResponse[T<:AbstractRequest](ctx: RegistrationTestContext, 
response: AbstractResponse): Future[T] = {
+    val result = new CompletableFuture[T]()
+    ctx.mockClient.prepareResponseFrom(
+      (body: AbstractRequest) => result.complete(body.asInstanceOf[T]),
+      response,
+      ctx.controllerNodeProvider.node.get
+    )
+    result
+  }
+
+  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
+    while (!future.isDone || context.mockClient.hasInFlightRequests) {
+      context.poll()
+      manager.eventQueue.wakeup()
+      context.time.sleep(100)
+    }
+    future.get
+  }
+
+  @Test
+  def testOfflineDirsSentUntilHeartbeatSuccess(): Unit = {
+    val ctx = new RegistrationTestContext(configProperties)
+    val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"offline-dirs-sent-in-heartbeat-", isZkBroker = false)
+    val controllerNode = new Node(3000, "localhost", 8021)
+    ctx.controllerNodeProvider.node.set(controllerNode)
+
+    val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
+    val hb1 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()
+      .setErrorCode(Errors.NOT_CONTROLLER.code())))
+    val hb2 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+    val hb3 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+
+    val offlineDirs = Set(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"), 
Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
+    offlineDirs.foreach(manager.propagateDirectoryFailure)
+
+    // start the manager late to prevent a race, and force expectations on the 
first heartbeat
+    manager.start(() => ctx.highestMetadataOffset.get(),
+      ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
+      Collections.emptyMap(), OptionalLong.empty())
+
+    poll(ctx, manager, registration)
+    val dirs1 = poll(ctx, manager, hb1).data().offlineLogDirs()
+    val dirs2 = poll(ctx, manager, hb2).data().offlineLogDirs()
+    val dirs3 = poll(ctx, manager, hb3).data().offlineLogDirs()
+
+    assertEquals(offlineDirs, dirs1.asScala.toSet)
+    assertEquals(offlineDirs, dirs2.asScala.toSet)
+    assertEquals(Set.empty, dirs3.asScala.toSet)
+    manager.close()
+  }
+
+  @Test
+  def testRegistrationIncludesDirs(): Unit = {
+    val logDirs = Set("ad5FLIeCTnaQdai5vOjeng", 
"ybdzUKmYSLK6oiIpI6CPlw").map(Uuid.fromString)
+    val ctx = new RegistrationTestContext(configProperties)
+    val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"registration-includes-dirs-",
+      isZkBroker = false, logDirs)
+    val controllerNode = new Node(3000, "localhost", 8021)
+    ctx.controllerNodeProvider.node.set(controllerNode)
+
+    val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
+
+    manager.start(() => ctx.highestMetadataOffset.get(),
+      ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
+      Collections.emptyMap(), OptionalLong.empty())
+    val request = poll(ctx, manager, 
registration).asInstanceOf[BrokerRegistrationRequest]
+
+    assertEquals(logDirs, request.data.logDirs().asScala.toSet)
+
+    manager.close()
+  }
 }

Reply via email to