This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 7e51a2a43ba KAFKA-19294: Fix BrokerLifecycleManager RPC timeouts (#19745) 7e51a2a43ba is described below commit 7e51a2a43bac271ae6780511c10d9a6014d0a2c1 Author: Colin Patrick McCabe <cmcc...@apache.org> AuthorDate: Tue Jun 24 16:23:25 2025 -0700 KAFKA-19294: Fix BrokerLifecycleManager RPC timeouts (#19745) Previously, we could wait for up to half of the broker session timeout for an RPC to complete, and then delay by up to half of the broker session timeout. When taken together, these two delays could lead to brokers erroneously missing heartbeats. This change removes exponential backoff for heartbeats sent from the broker to the controller. The load caused by heartbeats is not heavy, and controllers can easily time out heartbeats when the queue length is too long. Additionally, we now set the maximum RPC time to the length of the broker period. This minimizes the impact of heavy load. Reviewers: José Armando García Sancio <jsan...@apache.org>, David Arthur <mum...@gmail.com> --- .../scala/kafka/server/BrokerLifecycleManager.scala | 21 +++------------------ core/src/main/scala/kafka/server/BrokerServer.scala | 2 +- 2 files changed, 4 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala index 36c666b6467..fdcbbf5964c 100644 --- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala +++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse} import org.apache.kafka.metadata.{BrokerState, VersionRange} import org.apache.kafka.queue.EventQueue.DeadlineFunction -import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time} +import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.queue.{EventQueue, KafkaEventQueue} import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} @@ -93,18 +93,6 @@ class BrokerLifecycleManager( private val initialTimeoutNs = MILLISECONDS.toNanos(config.initialRegistrationTimeoutMs.longValue()) - /** - * The exponential backoff to use for resending communication. - */ - private val resendExponentialBackoff = - new ExponentialBackoff(100, 2, config.brokerSessionTimeoutMs.toLong / 2, 0.02) - - /** - * The number of times we've tried and failed to communicate. This variable can only be - * read or written from the BrokerToControllerRequestThread. - */ - private var failedAttempts = 0L - /** * The broker incarnation ID. This ID uniquely identifies each time we start the broker */ @@ -449,7 +437,6 @@ class BrokerLifecycleManager( val message = response.responseBody().asInstanceOf[BrokerRegistrationResponse] val errorCode = Errors.forCode(message.data().errorCode()) if (errorCode == Errors.NONE) { - failedAttempts = 0 _brokerEpoch = message.data().brokerEpoch() registered = true initialRegistrationSucceeded = true @@ -523,7 +510,6 @@ class BrokerLifecycleManager( val errorCode = Errors.forCode(message.data().errorCode()) if (errorCode == Errors.NONE) { val responseData = message.data() - failedAttempts = 0 currentOfflineDirs.foreach(cur => offlineDirs.put(cur, true)) _state match { case BrokerState.STARTING => @@ -586,10 +572,9 @@ class BrokerLifecycleManager( } private def scheduleNextCommunicationAfterFailure(): Unit = { - val delayMs = resendExponentialBackoff.backoff(failedAttempts) - failedAttempts = failedAttempts + 1 nextSchedulingShouldBeImmediate = false // never immediately reschedule after a failure - scheduleNextCommunication(NANOSECONDS.convert(delayMs, MILLISECONDS)) + scheduleNextCommunication(NANOSECONDS.convert( + config.brokerHeartbeatIntervalMs.longValue() , MILLISECONDS)) } private def scheduleNextCommunicationAfterSuccess(): Unit = { diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 4fdd3a0779a..85612c58d4a 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -405,7 +405,7 @@ class BrokerServer( config, "heartbeat", s"broker-${config.nodeId}-", - config.brokerSessionTimeoutMs / 2 // KAFKA-14392 + config.brokerHeartbeatIntervalMs ) lifecycleManager.start( () => sharedServer.loader.lastAppliedOffset(),