This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 148be487859 KAFKA-20012 Move NodeToControllerChannelManagerImpl to 
server module (#21261)
148be487859 is described below

commit 148be4878594ff145c0fe753815012fb6a43dff0
Author: Chih-Yuan Chien <[email protected]>
AuthorDate: Thu Jan 15 23:34:22 2026 +0800

    KAFKA-20012 Move NodeToControllerChannelManagerImpl to server module 
(#21261)
    
    Migrates `NodeToControllerChannelManager` and related classes from Scala
    (core module) to Java (server module).
    
    New Java Classes (server module)  include `ControllerInformation`,
    `RaftControllerNodeProvider`, `NodeToControllerChannelManagerImpl`,
    `NodeToControllerRequestThread` and `NodeToControllerQueueItem`.
    
    In `AbstractKafkaConfig`,  added
    `addReconfigurable()`/`removeReconfigurable()` abstract methods to
    support  migration.  In `KafkaConfig`, added `override` keywords for
    reconfigurable methods.   Updated imports and usages across core module
    classes and tests.  Replace `ControllerNodeProvider`  by
    `Supplier<ControllerInformation>`.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../scala/kafka/server/AlterPartitionManager.scala |  20 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  10 +-
 .../main/scala/kafka/server/ControllerServer.scala |   4 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   4 +-
 .../server/NodeToControllerChannelManager.scala    | 342 ---------------------
 .../server/NodeToControllerRequestThreadTest.scala | 107 ++++---
 .../unit/kafka/server/ForwardingManagerTest.scala  |  24 +-
 .../MockNodeToControllerChannelManager.scala       |  17 +-
 .../kafka/server/RegistrationTestContext.scala     |   8 +-
 .../common/NodeToControllerChannelManager.java     |   2 +-
 .../apache/kafka/server/ControllerInformation.java |  39 +++
 .../server/NodeToControllerChannelManagerImpl.java | 174 +++++++++++
 .../kafka/server/NodeToControllerQueueItem.java    |  31 +-
 .../server/NodeToControllerRequestThread.java      | 197 ++++++++++++
 .../kafka/server/RaftControllerNodeProvider.java   |  69 +++++
 .../kafka/server/config/AbstractKafkaConfig.java   |  21 ++
 .../server/BrokerRegistrationRequestTest.java      |  17 +-
 .../transaction/AddPartitionsToTxnManagerTest.java |  12 +-
 18 files changed, 639 insertions(+), 459 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala 
b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index 9f5052349c8..3b060a35d8f 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -29,7 +29,9 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
 import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager, TopicIdPartition}
 import org.apache.kafka.server.util.Scheduler
+import org.apache.kafka.server.{ControllerInformation, 
NodeToControllerChannelManagerImpl}
 
+import java.util.function.Supplier
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 import scala.jdk.OptionConverters.RichOptional
@@ -66,7 +68,7 @@ object AlterPartitionManager {
   def apply(
     config: KafkaConfig,
     scheduler: Scheduler,
-    controllerNodeProvider: ControllerNodeProvider,
+    controllerNodeProvider: Supplier[ControllerInformation],
     time: Time,
     metrics: Metrics,
     threadNamePrefix: String,
@@ -74,12 +76,12 @@ object AlterPartitionManager {
   ): AlterPartitionManager = {
     val channelManager = new NodeToControllerChannelManagerImpl(
       controllerNodeProvider,
-      time = time,
-      metrics = metrics,
-      config = config,
-      channelName = "alter-partition",
-      threadNamePrefix = threadNamePrefix,
-      retryTimeoutMs = Long.MaxValue
+      time,
+      metrics,
+      config,
+      "alter-partition",
+      threadNamePrefix,
+      Long.MaxValue
     )
     new DefaultAlterPartitionManager(
       controllerChannelManager = channelManager,
@@ -150,7 +152,7 @@ class DefaultAlterPartitionManager(
     val request = buildRequest(inflightAlterPartitionItems, brokerEpoch)
     debug(s"Sending AlterPartition to controller $request")
 
-    // We will not timeout AlterPartition request, instead letting it retry 
indefinitely
+    // We will not time out AlterPartition request, instead letting it retry 
indefinitely
     // until a response is received, or a new LeaderAndIsr overwrites the 
existing isrState
     // which causes the response for those partitions to be ignored.
     controllerChannelManager.sendRequest(request,
@@ -159,7 +161,7 @@ class DefaultAlterPartitionManager(
           debug(s"Received AlterPartition response $response")
           val error = try {
             if (response.authenticationException != null) {
-              // For now we treat authentication errors as retriable. We use 
the
+              // For now, we treat authentication errors as retriable. We use 
the
               // `NETWORK_EXCEPTION` error code for lack of a good alternative.
               // Note that `NodeToControllerChannelManager` will still log the
               // authentication errors so that users have a chance to fix the 
problem.
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 2d21ee59eff..67ea88b37fb 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -59,6 +59,8 @@ import org.apache.kafka.server.{AssignmentsManager, 
BrokerFeatures, ClientMetric
 import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
+import org.apache.kafka.server.NodeToControllerChannelManagerImpl
+import org.apache.kafka.server.RaftControllerNodeProvider
 
 import java.time.Duration
 import java.util
@@ -233,16 +235,16 @@ class BrokerServer(
         "controller quorum voters future",
         sharedServer.controllerQuorumVotersFuture,
         startupDeadline, time)
-      val controllerNodeProvider = RaftControllerNodeProvider(raftManager, 
config)
+      val controllerNodeProvider = 
RaftControllerNodeProvider.create(raftManager, config)
 
       clientToControllerChannelManager = new 
NodeToControllerChannelManagerImpl(
         controllerNodeProvider,
         time,
         metrics,
         config,
-        channelName = "forwarding",
+        "forwarding",
         s"broker-${config.nodeId}-",
-        retryTimeoutMs = 60000
+        60000
       )
       clientToControllerChannelManager.start()
       forwardingManager = new 
ForwardingManagerImpl(clientToControllerChannelManager, metrics)
@@ -316,7 +318,7 @@ class BrokerServer(
         config,
         "directory-assignments",
         s"broker-${config.nodeId}-",
-        retryTimeoutMs = 60000
+        60000
       )
       assignmentsManager = new AssignmentsManager(
         time,
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 81af28dc495..966ade4403d 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -51,6 +51,8 @@ import org.apache.kafka.server.metrics.{KafkaMetricsGroup, 
KafkaYammerMetrics, L
 import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
 import org.apache.kafka.server.util.{Deadline, FutureUtils}
+import org.apache.kafka.server.NodeToControllerChannelManagerImpl
+import org.apache.kafka.server.RaftControllerNodeProvider
 
 import java.util
 import java.util.{Optional, OptionalLong}
@@ -405,7 +407,7 @@ class ControllerServer(
       /**
        * Start the KIP-919 controller registration manager.
        */
-      val controllerNodeProvider = RaftControllerNodeProvider(raftManager, 
config)
+      val controllerNodeProvider = 
RaftControllerNodeProvider.create(raftManager, config)
       registrationChannelManager = new NodeToControllerChannelManagerImpl(
         controllerNodeProvider,
         time,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 706a359afed..2092d36d7f9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -420,11 +420,11 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   val unstableApiVersionsEnabled = 
getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG)
   val unstableFeatureVersionsEnabled = 
getBoolean(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG)
 
-  def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
+  override def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
     dynamicConfig.addReconfigurable(reconfigurable)
   }
 
-  def removeReconfigurable(reconfigurable: Reconfigurable): Unit = {
+  override def removeReconfigurable(reconfigurable: Reconfigurable): Unit = {
     dynamicConfig.removeReconfigurable(reconfigurable)
   }
 
diff --git 
a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala 
b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
deleted file mode 100644
index 0caa03ec052..00000000000
--- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.utils.Logging
-import org.apache.kafka.clients._
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network._
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.AbstractRequest
-import org.apache.kafka.common.security.JaasContext
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.common.{Node, Reconfigurable}
-import org.apache.kafka.raft.RaftManager
-import org.apache.kafka.server.common.{ApiMessageAndVersion, 
ControllerRequestCompletionHandler, NodeToControllerChannelManager}
-import org.apache.kafka.server.util.{InterBrokerSendThread, 
RequestAndCompletionHandler}
-
-import java.util
-import java.util.Optional
-import java.util.concurrent.LinkedBlockingDeque
-import java.util.concurrent.atomic.AtomicReference
-import scala.collection.Seq
-import scala.jdk.CollectionConverters._
-import scala.jdk.OptionConverters.{RichOption, RichOptional, RichOptionalInt}
-
-case class ControllerInformation(
-  node: Option[Node],
-  listenerName: ListenerName,
-  securityProtocol: SecurityProtocol,
-  saslMechanism: String
-)
-
-trait ControllerNodeProvider {
-  def getControllerInfo(): ControllerInformation
-}
-
-object RaftControllerNodeProvider {
-  def apply(
-    raftManager: RaftManager[ApiMessageAndVersion],
-    config: KafkaConfig,
-  ): RaftControllerNodeProvider = {
-    val controllerListenerName = new 
ListenerName(config.controllerListenerNames.get(0))
-    val controllerSecurityProtocol = 
Option(config.effectiveListenerSecurityProtocolMap.get(controllerListenerName))
-      .getOrElse(SecurityProtocol.forName(controllerListenerName.value()))
-    val controllerSaslMechanism = config.saslMechanismControllerProtocol
-    new RaftControllerNodeProvider(
-      raftManager,
-      controllerListenerName,
-      controllerSecurityProtocol,
-      controllerSaslMechanism
-    )
-  }
-}
-
-/**
- * Finds the controller node by checking the metadata log manager.
- * This provider is used when we are using a Raft-based metadata quorum.
- */
-class RaftControllerNodeProvider(
-  val raftManager: RaftManager[ApiMessageAndVersion],
-  val listenerName: ListenerName,
-  val securityProtocol: SecurityProtocol,
-  val saslMechanism: String
-) extends ControllerNodeProvider with Logging {
-
-  private def idToNode(id: Int): Option[Node] = 
raftManager.client.voterNode(id, listenerName).toScala
-
-  override def getControllerInfo(): ControllerInformation =
-    
ControllerInformation(raftManager.client.leaderAndEpoch.leaderId.toScala.flatMap(idToNode),
-      listenerName, securityProtocol, saslMechanism)
-}
-
-/**
- * This class manages the connection between a broker and the controller. It 
runs a single
- * [[NodeToControllerRequestThread]] which uses the broker's metadata cache as 
its own metadata to find
- * and connect to the controller. The channel is async and runs the network 
connection in the background.
- * The maximum number of in-flight requests are set to one to ensure orderly 
response from the controller, therefore
- * care must be taken to not block on outstanding requests for too long.
- */
-class NodeToControllerChannelManagerImpl(
-  controllerNodeProvider: ControllerNodeProvider,
-  time: Time,
-  metrics: Metrics,
-  config: KafkaConfig,
-  channelName: String,
-  threadNamePrefix: String,
-  retryTimeoutMs: Long
-) extends NodeToControllerChannelManager with Logging {
-  private val logContext = new LogContext(s"[NodeToControllerChannelManager 
id=${config.nodeId} name=${channelName}] ")
-  private val manualMetadataUpdater = new ManualMetadataUpdater()
-  private val apiVersions = new ApiVersions()
-  private val requestThread = newRequestThread
-
-  def start(): Unit = {
-    requestThread.start()
-  }
-
-  def shutdown(): Unit = {
-    requestThread.shutdown()
-    info(s"Node to controller channel manager for $channelName shutdown")
-  }
-
-  private[server] def newRequestThread = {
-    def buildNetworkClient(controllerInfo: ControllerInformation) = {
-      val channelBuilder = ChannelBuilders.clientChannelBuilder(
-        controllerInfo.securityProtocol,
-        JaasContext.Type.SERVER,
-        config,
-        controllerInfo.listenerName,
-        controllerInfo.saslMechanism,
-        time,
-        logContext
-      )
-      channelBuilder match {
-        case reconfigurable: Reconfigurable => 
config.addReconfigurable(reconfigurable)
-        case _ =>
-      }
-      val selector = new Selector(
-        NetworkReceive.UNLIMITED,
-        Selector.NO_IDLE_TIMEOUT_MS,
-        metrics,
-        time,
-        channelName,
-        Map("BrokerId" -> config.brokerId.toString).asJava,
-        false,
-        channelBuilder,
-        logContext
-      )
-      new NetworkClient(
-        selector,
-        manualMetadataUpdater,
-        config.brokerId.toString,
-        1,
-        50,
-        50,
-        Selectable.USE_DEFAULT_BUFFER_SIZE,
-        Selectable.USE_DEFAULT_BUFFER_SIZE,
-        Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, 
retryTimeoutMs)).toInt, // request timeout should not exceed the provided retry 
timeout
-        config.connectionSetupTimeoutMs,
-        config.connectionSetupTimeoutMaxMs,
-        time,
-        true,
-        apiVersions,
-        logContext,
-        MetadataRecoveryStrategy.NONE
-      )
-    }
-    val threadName = 
s"${threadNamePrefix}to-controller-${channelName}-channel-manager"
-
-    val controllerInformation = controllerNodeProvider.getControllerInfo()
-    new NodeToControllerRequestThread(
-      buildNetworkClient(controllerInformation),
-      manualMetadataUpdater,
-      controllerNodeProvider,
-      config,
-      time,
-      threadName,
-      retryTimeoutMs
-    )
-  }
-
-  /**
-   * Send request to the controller.
-   *
-   * @param request         The request to be sent.
-   * @param callback        Request completion callback.
-   */
-  def sendRequest(
-    request: AbstractRequest.Builder[_ <: AbstractRequest],
-    callback: ControllerRequestCompletionHandler
-  ): Unit = {
-    requestThread.enqueue(NodeToControllerQueueItem(
-      time.milliseconds(),
-      request,
-      callback
-    ))
-  }
-
-  def controllerApiVersions(): Optional[NodeApiVersions] = {
-    requestThread.activeControllerAddress().flatMap { activeController =>
-      Option(apiVersions.get(activeController.idString))
-    }.toJava
-  }
-
-  def getTimeoutMs: Long = retryTimeoutMs
-}
-
-case class NodeToControllerQueueItem(
-  createdTimeMs: Long,
-  request: AbstractRequest.Builder[_ <: AbstractRequest],
-  callback: ControllerRequestCompletionHandler
-)
-
-class NodeToControllerRequestThread(
-  initialNetworkClient: KafkaClient,
-  metadataUpdater: ManualMetadataUpdater,
-  controllerNodeProvider: ControllerNodeProvider,
-  config: KafkaConfig,
-  time: Time,
-  threadName: String,
-  retryTimeoutMs: Long
-) extends InterBrokerSendThread(
-  threadName,
-  initialNetworkClient,
-  Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, 
retryTimeoutMs)).toInt,
-  time,
-  false
-) with Logging {
-
-  this.logIdent = logPrefix
-
-  private val requestQueue = new 
LinkedBlockingDeque[NodeToControllerQueueItem]()
-  private val activeController = new AtomicReference[Node](null)
-
-  // Used for testing
-  @volatile
-  private[server] var started = false
-
-  def activeControllerAddress(): Option[Node] = {
-    Option(activeController.get())
-  }
-
-  private def updateControllerAddress(newActiveController: Node): Unit = {
-    activeController.set(newActiveController)
-  }
-
-  def enqueue(request: NodeToControllerQueueItem): Unit = {
-    if (!started) {
-      throw new IllegalStateException("Cannot enqueue a request if the request 
thread is not running")
-    }
-    requestQueue.add(request)
-    if (activeControllerAddress().isDefined) {
-      wakeup()
-    }
-  }
-
-  def queueSize: Int = {
-    requestQueue.size
-  }
-
-  override def generateRequests(): 
util.Collection[RequestAndCompletionHandler] = {
-    val currentTimeMs = time.milliseconds()
-    val requestIter = requestQueue.iterator()
-    while (requestIter.hasNext) {
-      val request = requestIter.next
-      if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) {
-        requestIter.remove()
-        request.callback.onTimeout()
-      } else {
-        val controllerAddress = activeControllerAddress()
-        if (controllerAddress.isDefined) {
-          requestIter.remove()
-          return util.Collections.singletonList(new 
RequestAndCompletionHandler(
-            time.milliseconds(),
-            controllerAddress.get,
-            request.request,
-            response => handleResponse(request)(response)
-          ))
-        }
-      }
-    }
-    util.Collections.emptyList()
-  }
-
-  private[server] def handleResponse(queueItem: 
NodeToControllerQueueItem)(response: ClientResponse): Unit = {
-    debug(s"Request ${queueItem.request} received $response")
-    if (response.authenticationException != null) {
-      error(s"Request ${queueItem.request} failed due to authentication error 
with controller. Disconnecting the " +
-        s"connection to the stale controller 
${activeControllerAddress().map(_.idString).getOrElse("null")}",
-        response.authenticationException)
-      maybeDisconnectAndUpdateController()
-      queueItem.callback.onComplete(response)
-    } else if (response.versionMismatch != null) {
-      error(s"Request ${queueItem.request} failed due to unsupported version 
error",
-        response.versionMismatch)
-      queueItem.callback.onComplete(response)
-    } else if (response.wasDisconnected()) {
-      updateControllerAddress(null)
-      requestQueue.putFirst(queueItem)
-    } else if 
(response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
-      debug(s"Request ${queueItem.request} received NOT_CONTROLLER exception. 
Disconnecting the " +
-        s"connection to the stale controller 
${activeControllerAddress().map(_.idString).getOrElse("null")}")
-      maybeDisconnectAndUpdateController()
-      requestQueue.putFirst(queueItem)
-    } else {
-      queueItem.callback.onComplete(response)
-    }
-  }
-
-  private def maybeDisconnectAndUpdateController(): Unit = {
-    // just close the controller connection and wait for metadata cache update 
in doWork
-    activeControllerAddress().foreach { controllerAddress =>
-      try {
-        // We don't care if disconnect has an error, just log it and get a new 
network client
-        networkClient.disconnect(controllerAddress.idString)
-      } catch {
-        case t: Throwable => error("Had an error while disconnecting from 
NetworkClient.", t)
-      }
-      updateControllerAddress(null)
-    }
-  }
-
-  override def doWork(): Unit = {
-    val controllerInformation = controllerNodeProvider.getControllerInfo()
-    if (activeControllerAddress().isDefined) {
-      super.pollOnce(Long.MaxValue)
-    } else {
-      debug("Controller isn't cached, looking for local metadata changes")
-      controllerInformation.node match {
-        case Some(controllerNode) =>
-          info(s"Recorded new KRaft controller, from now on will use node 
$controllerNode")
-          updateControllerAddress(controllerNode)
-          metadataUpdater.setNodes(Seq(controllerNode).asJava)
-        case None =>
-          // need to backoff to avoid tight loops
-          debug("No controller provided, retrying after backoff")
-          super.pollOnce(100)
-      }
-    }
-  }
-
-  override def start(): Unit = {
-    super.start()
-    started = true
-  }
-}
diff --git 
a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala 
b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
index 46c7237dafb..a90f9034d02 100644
--- a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
@@ -31,15 +31,22 @@ import 
org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.server.common.ControllerRequestCompletionHandler
+import org.apache.kafka.server.ControllerInformation
+import org.apache.kafka.server.NodeToControllerRequestThread
+import org.apache.kafka.server.NodeToControllerQueueItem
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito._
 
+import java.util.Optional
+import java.util.function.Supplier
+import scala.jdk.OptionConverters.RichOption
+
 
 class NodeToControllerRequestThreadTest {
 
   private def controllerInfo(node: Option[Node]): ControllerInformation = {
-    ControllerInformation(node, new ListenerName(""), 
SecurityProtocol.PLAINTEXT, "")
+    new ControllerInformation(node.toJava, new ListenerName(""), 
SecurityProtocol.PLAINTEXT, "")
   }
 
   private def emptyControllerInfo: ControllerInformation = {
@@ -52,18 +59,18 @@ class NodeToControllerRequestThreadTest {
     val config = new KafkaConfig(TestUtils.createBrokerConfig(1))
     val metadata = mock(classOf[Metadata])
     val mockClient = new MockClient(time, metadata)
-    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
 
-    
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
+    when(controllerNodeProvider.get()).thenReturn(emptyControllerInfo)
 
     val retryTimeoutMs = 30000
     val testRequestThread = new NodeToControllerRequestThread(
       mockClient,  new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs)
-    testRequestThread.started = true
+    testRequestThread.setStarted(true)
 
     val completionHandler = new TestControllerRequestCompletionHandler(None)
-    val queueItem = NodeToControllerQueueItem(
+    val queueItem = new NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
       completionHandler
@@ -89,20 +96,20 @@ class NodeToControllerRequestThreadTest {
     val metadata = mock(classOf[Metadata])
     val mockClient = new MockClient(time, metadata)
 
-    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
     val activeController = new Node(controllerId, "host", 1234)
 
-    
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
+    
when(controllerNodeProvider.get()).thenReturn(controllerInfo(Some(activeController)))
 
     val expectedResponse = RequestTestUtils.metadataUpdateWith(2, 
java.util.Map.of("a", 2))
     val testRequestThread = new NodeToControllerRequestThread(
       mockClient,  new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
-    testRequestThread.started = true
+      controllerNodeProvider, config, time, "", Long.MaxValue)
+    testRequestThread.setStarted(true)
     mockClient.prepareResponse(expectedResponse)
 
     val completionHandler = new 
TestControllerRequestCompletionHandler(Some(expectedResponse))
-    val queueItem = NodeToControllerQueueItem(
+    val queueItem = new NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
       completionHandler
@@ -131,21 +138,21 @@ class NodeToControllerRequestThreadTest {
     val metadata = mock(classOf[Metadata])
     val mockClient = new MockClient(time, metadata)
 
-    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
     val oldController = new Node(oldControllerId, "host1", 1234)
     val newController = new Node(newControllerId, "host2", 1234)
 
-    when(controllerNodeProvider.getControllerInfo()).thenReturn(
+    when(controllerNodeProvider.get()).thenReturn(
       controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
 
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, 
java.util.Map.of("a", 2))
     val testRequestThread = new NodeToControllerRequestThread(
       mockClient, new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
-    testRequestThread.started = true
+      controllerNodeProvider, config, time, "", Long.MaxValue)
+    testRequestThread.setStarted(true)
 
     val completionHandler = new 
TestControllerRequestCompletionHandler(Some(expectedResponse))
-    val queueItem = NodeToControllerQueueItem(
+    val queueItem = new NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
       completionHandler,
@@ -179,12 +186,12 @@ class NodeToControllerRequestThreadTest {
     val metadata = mock(classOf[Metadata])
     val mockClient = new MockClient(time, metadata)
 
-    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
     val port = 1234
     val oldController = new Node(oldControllerId, "host1", port)
     val newController = new Node(newControllerId, "host2", port)
 
-    when(controllerNodeProvider.getControllerInfo()).thenReturn(
+    when(controllerNodeProvider.get()).thenReturn(
       controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
 
     val responseWithNotControllerError = 
RequestTestUtils.metadataUpdateWith("cluster1", 2,
@@ -193,11 +200,11 @@ class NodeToControllerRequestThreadTest {
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, 
java.util.Map.of("a", 2))
     val testRequestThread = new NodeToControllerRequestThread(
       mockClient, new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
-    testRequestThread.started = true
+      controllerNodeProvider, config, time, "", Long.MaxValue)
+    testRequestThread.setStarted(true)
 
     val completionHandler = new 
TestControllerRequestCompletionHandler(Some(expectedResponse))
-    val queueItem = NodeToControllerQueueItem(
+    val queueItem = new NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()
         .setAllowAutoTopicCreation(true)),
@@ -208,7 +215,7 @@ class NodeToControllerRequestThreadTest {
     testRequestThread.doWork()
 
     val oldBrokerNode = new Node(oldControllerId, "host1", port)
-    assertEquals(Some(oldBrokerNode), 
testRequestThread.activeControllerAddress())
+    assertEquals(Some(oldBrokerNode).toJava, 
testRequestThread.activeControllerAddress())
 
     // send and process the request
     mockClient.prepareResponse((body: AbstractRequest) => {
@@ -216,7 +223,7 @@ class NodeToControllerRequestThreadTest {
       body.asInstanceOf[MetadataRequest].allowAutoTopicCreation()
     }, responseWithNotControllerError)
     testRequestThread.doWork()
-    assertEquals(None, testRequestThread.activeControllerAddress())
+    assertEquals(Optional.empty(), testRequestThread.activeControllerAddress())
     // reinitialize the controller to a different node
     testRequestThread.doWork()
     // process the request again
@@ -224,7 +231,7 @@ class NodeToControllerRequestThreadTest {
     testRequestThread.doWork()
 
     val newControllerNode = new Node(newControllerId, "host2", port)
-    assertEquals(Some(newControllerNode), 
testRequestThread.activeControllerAddress())
+    assertEquals(Some(newControllerNode).toJava, 
testRequestThread.activeControllerAddress())
 
     assertTrue(completionHandler.completed.get())
   }
@@ -241,12 +248,12 @@ class NodeToControllerRequestThreadTest {
     // enable envelope API
     mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.ENVELOPE.id, 
0.toShort, 0.toShort))
 
-    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
     val port = 1234
     val oldController = new Node(oldControllerId, "host1", port)
     val newController = new Node(newControllerId, "host2", port)
 
-    when(controllerNodeProvider.getControllerInfo()).thenReturn(
+    when(controllerNodeProvider.get()).thenReturn(
       controllerInfo(Some(oldController)),
       controllerInfo(Some(newController))
     )
@@ -260,8 +267,8 @@ class NodeToControllerRequestThreadTest {
 
     val testRequestThread = new NodeToControllerRequestThread(
       mockClient,  new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
-    testRequestThread.started = true
+      controllerNodeProvider, config, time, "", Long.MaxValue)
+    testRequestThread.setStarted(true)
 
     val completionHandler = new 
TestControllerRequestCompletionHandler(Some(expectedResponse))
     val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"principal", true)
@@ -271,7 +278,7 @@ class NodeToControllerRequestThreadTest {
     val envelopeRequestBuilder = new 
EnvelopeRequest.Builder(ByteBuffer.allocate(0),
       kafkaPrincipalBuilder.serialize(kafkaPrincipal), 
"client-address".getBytes)
 
-    val queueItem = NodeToControllerQueueItem(
+    val queueItem = new NodeToControllerQueueItem(
       time.milliseconds(),
       envelopeRequestBuilder,
       completionHandler
@@ -282,7 +289,7 @@ class NodeToControllerRequestThreadTest {
     testRequestThread.doWork()
 
     val oldBrokerNode = new Node(oldControllerId, "host1", port)
-    assertEquals(Some(oldBrokerNode), 
testRequestThread.activeControllerAddress())
+    assertEquals(Some(oldBrokerNode).toJava, 
testRequestThread.activeControllerAddress())
 
     // send and process the envelope request
     mockClient.prepareResponse((body: AbstractRequest) => {
@@ -290,7 +297,7 @@ class NodeToControllerRequestThreadTest {
     }, envelopeResponseWithNotControllerError)
     testRequestThread.doWork()
     // expect to reset the activeControllerAddress after finding the 
NOT_CONTROLLER error
-    assertEquals(None, testRequestThread.activeControllerAddress())
+    assertEquals(Optional.empty(), testRequestThread.activeControllerAddress())
     // reinitialize the controller to a different node
     testRequestThread.doWork()
     // process the request again
@@ -298,7 +305,7 @@ class NodeToControllerRequestThreadTest {
     testRequestThread.doWork()
 
     val newControllerNode = new Node(newControllerId, "host2", port)
-    assertEquals(Some(newControllerNode), 
testRequestThread.activeControllerAddress())
+    assertEquals(Some(newControllerNode).toJava, 
testRequestThread.activeControllerAddress())
 
     assertTrue(completionHandler.completed.get())
   }
@@ -312,10 +319,10 @@ class NodeToControllerRequestThreadTest {
     val metadata = mock(classOf[Metadata])
     val mockClient = new MockClient(time, metadata)
 
-    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
     val controller = new Node(controllerId, "host1", 1234)
 
-    
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(controller)))
+    
when(controllerNodeProvider.get()).thenReturn(controllerInfo(Some(controller)))
 
     val retryTimeoutMs = 30000
     val responseWithNotControllerError = 
RequestTestUtils.metadataUpdateWith("cluster1", 2,
@@ -324,10 +331,10 @@ class NodeToControllerRequestThreadTest {
     val testRequestThread = new NodeToControllerRequestThread(
       mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs)
-    testRequestThread.started = true
+    testRequestThread.setStarted(true)
 
     val completionHandler = new TestControllerRequestCompletionHandler()
-    val queueItem = NodeToControllerQueueItem(
+    val queueItem = new NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()
         .setAllowAutoTopicCreation(true)),
@@ -361,10 +368,10 @@ class NodeToControllerRequestThreadTest {
     val metadata = mock(classOf[Metadata])
     val mockClient = new MockClient(time, metadata)
 
-    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
     val activeController = new Node(controllerId, "host", 1234)
 
-    
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
+    
when(controllerNodeProvider.get()).thenReturn(controllerInfo(Some(activeController)))
 
     val callbackResponse = new AtomicReference[ClientResponse]()
     val completionHandler = new ControllerRequestCompletionHandler {
@@ -372,7 +379,7 @@ class NodeToControllerRequestThreadTest {
       override def onComplete(response: ClientResponse): Unit = 
callbackResponse.set(response)
     }
 
-    val queueItem = NodeToControllerQueueItem(
+    val queueItem = new NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
       completionHandler
@@ -382,8 +389,8 @@ class NodeToControllerRequestThreadTest {
 
     val testRequestThread = new NodeToControllerRequestThread(
       mockClient,   new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
-    testRequestThread.started = true
+      controllerNodeProvider, config, time, "", Long.MaxValue)
+    testRequestThread.setStarted(true)
 
     testRequestThread.enqueue(queueItem)
     pollUntil(testRequestThread, () => callbackResponse.get != null)
@@ -399,10 +406,10 @@ class NodeToControllerRequestThreadTest {
     val metadata = mock(classOf[Metadata])
     val mockClient = new MockClient(time, metadata)
 
-    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
     val activeController = new Node(controllerId, "host", 1234)
 
-    
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
+    
when(controllerNodeProvider.get()).thenReturn(controllerInfo(Some(activeController)))
 
     val callbackResponse = new AtomicReference[ClientResponse]()
     val completionHandler = new ControllerRequestCompletionHandler {
@@ -410,7 +417,7 @@ class NodeToControllerRequestThreadTest {
       override def onComplete(response: ClientResponse): Unit = 
callbackResponse.set(response)
     }
 
-    val queueItem = NodeToControllerQueueItem(
+    val queueItem = new NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
       completionHandler
@@ -420,13 +427,13 @@ class NodeToControllerRequestThreadTest {
 
     val testRequestThread = new NodeToControllerRequestThread(
       mockClient,  new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
-    testRequestThread.started = true
+      controllerNodeProvider, config, time, "", Long.MaxValue)
+    testRequestThread.setStarted(true)
 
     testRequestThread.enqueue(queueItem)
     pollUntil(testRequestThread, () => callbackResponse.get != null)
     assertNotNull(callbackResponse.get.authenticationException)
-    assertEquals(None, testRequestThread.activeControllerAddress())
+    assertEquals(Optional.empty(), testRequestThread.activeControllerAddress())
   }
 
   @Test
@@ -438,15 +445,15 @@ class NodeToControllerRequestThreadTest {
     val metadata = mock(classOf[Metadata])
     val mockClient = new MockClient(time, metadata)
 
-    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
-    
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
+    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
+    when(controllerNodeProvider.get()).thenReturn(emptyControllerInfo)
 
     val testRequestThread = new NodeToControllerRequestThread(
       mockClient,   new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
+      controllerNodeProvider, config, time, "", Long.MaxValue)
 
     val completionHandler = new TestControllerRequestCompletionHandler(None)
-    val queueItem = NodeToControllerQueueItem(
+    val queueItem = new NodeToControllerQueueItem(
       time.milliseconds(),
       new MetadataRequest.Builder(new MetadataRequestData()),
       completionHandler
diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
index 16e4b2bcb66..46815eaaa4b 100644
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
@@ -37,16 +37,18 @@ import 
org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.network.metrics.RequestChannelMetrics
 import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.ControllerInformation
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito
 
+import java.util.function.Supplier
 import scala.jdk.CollectionConverters._
 
 class ForwardingManagerTest {
   private val time = new MockTime()
   private val client = new MockClient(time)
-  private val controllerNodeProvider = 
Mockito.mock(classOf[ControllerNodeProvider])
+  private val controllerNodeProvider = 
Mockito.mock(classOf[Supplier[ControllerInformation]])
   private val brokerToController = new MockNodeToControllerChannelManager(
     client, time, controllerNodeProvider, controllerApiVersions)
   private val metrics = new Metrics()
@@ -66,11 +68,11 @@ class ForwardingManagerTest {
   }
 
   private def controllerInfo = {
-    ControllerInformation(Some(new Node(0, "host", 1234)), new 
ListenerName(""), SecurityProtocol.PLAINTEXT, "")
+    new ControllerInformation(Optional.of(new Node(0, "host", 1234)), new 
ListenerName(""), SecurityProtocol.PLAINTEXT, "")
   }
 
   private def emptyControllerInfo = {
-    ControllerInformation(None, new ListenerName(""), 
SecurityProtocol.PLAINTEXT, "")
+    new ControllerInformation(Optional.empty(), new ListenerName(""), 
SecurityProtocol.PLAINTEXT, "")
   }
 
   @Test
@@ -84,7 +86,7 @@ class ForwardingManagerTest {
     val responseBuffer = 
RequestTestUtils.serializeResponseWithHeader(responseBody, 
requestHeader.apiVersion,
       requestCorrelationId + 1)
 
-    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+    Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
     val isEnvelopeRequest: RequestMatcher = request => 
request.isInstanceOf[EnvelopeRequest]
     client.prepareResponse(isEnvelopeRequest, new 
EnvelopeResponse(responseBuffer, Errors.NONE))
 
@@ -108,7 +110,7 @@ class ForwardingManagerTest {
     val responseBuffer = 
RequestTestUtils.serializeResponseWithHeader(responseBody,
       requestHeader.apiVersion, requestCorrelationId)
 
-    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+    Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
     val isEnvelopeRequest: RequestMatcher = request => 
request.isInstanceOf[EnvelopeRequest]
     client.prepareResponse(isEnvelopeRequest, new 
EnvelopeResponse(responseBuffer, Errors.UNSUPPORTED_VERSION))
 
@@ -125,7 +127,7 @@ class ForwardingManagerTest {
     val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, 
requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
-    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
+    Mockito.when(controllerNodeProvider.get()).thenReturn(emptyControllerInfo)
 
     val response = new AtomicReference[AbstractResponse]()
     forwardingManager.forwardRequest(request, res => res.foreach(response.set))
@@ -149,7 +151,7 @@ class ForwardingManagerTest {
     val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, 
requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
-    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+    Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
 
     val response = new AtomicReference[AbstractResponse]()
     forwardingManager.forwardRequest(request, res => res.foreach(response.set))
@@ -175,7 +177,7 @@ class ForwardingManagerTest {
     val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, 
requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
-    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+    Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
 
     val isEnvelopeRequest: RequestMatcher = request => 
request.isInstanceOf[EnvelopeRequest]
     client.prepareUnsupportedVersionResponse(isEnvelopeRequest)
@@ -196,7 +198,7 @@ class ForwardingManagerTest {
     val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, 
requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
-    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+    Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
 
     client.createPendingAuthenticationError(controllerInfo.node.get, 50)
 
@@ -220,7 +222,7 @@ class ForwardingManagerTest {
     val responseBuffer = 
RequestTestUtils.serializeResponseWithHeader(responseBody,
       requestHeader.apiVersion, requestCorrelationId)
 
-    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+    Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
     val isEnvelopeRequest: RequestMatcher = request => 
request.isInstanceOf[EnvelopeRequest]
     client.prepareResponse(isEnvelopeRequest, new 
EnvelopeResponse(responseBuffer, Errors.UNSUPPORTED_VERSION))
 
@@ -242,7 +244,7 @@ class ForwardingManagerTest {
     val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, 
requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
-    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+    Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
 
     val response = new AtomicReference[AbstractResponse]()
     forwardingManager.forwardRequest(request, res => res.foreach(response.set))
diff --git 
a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
 
b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
index 633a2ffc80f..2a162d64bf3 100644
--- 
a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
+++ 
b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
@@ -21,13 +21,16 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.AbstractRequest
 import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.{ControllerInformation, 
NodeToControllerQueueItem}
 
 import java.util.Optional
+import java.util.function.Supplier
+import scala.jdk.OptionConverters._
 
 class MockNodeToControllerChannelManager(
   val client: MockClient,
   time: MockTime,
-  controllerNodeProvider: ControllerNodeProvider,
+  controllerNodeProvider: Supplier[ControllerInformation],
   controllerApiVersions: NodeApiVersions = NodeApiVersions.create(),
   val retryTimeoutMs: Int = 60000,
   val requestTimeoutMs: Int = 30000
@@ -44,10 +47,10 @@ class MockNodeToControllerChannelManager(
     request: AbstractRequest.Builder[_ <: AbstractRequest],
     callback: ControllerRequestCompletionHandler
   ): Unit = {
-    unsentQueue.add(NodeToControllerQueueItem(
-      createdTimeMs = time.milliseconds(),
-      request = request,
-      callback = callback
+    unsentQueue.add(new NodeToControllerQueueItem(
+      time.milliseconds(),
+      request,
+      callback
     ))
   }
 
@@ -78,7 +81,7 @@ class MockNodeToControllerChannelManager(
         queueItem.callback.onTimeout()
         unsentIterator.remove()
       } else {
-        controllerNodeProvider.getControllerInfo().node match {
+        controllerNodeProvider.get().node.toScala match {
           case Some(controller) if client.ready(controller, 
time.milliseconds()) =>
             val clientRequest = client.newClientRequest(
               controller.idString,
@@ -86,7 +89,7 @@ class MockNodeToControllerChannelManager(
               queueItem.createdTimeMs,
               true, // we expect response,
               requestTimeoutMs,
-              handleResponse(queueItem)
+              handleResponse(queueItem) _
             )
             client.send(clientRequest, time.milliseconds())
             unsentIterator.remove()
diff --git 
a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala 
b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
index dd5968055e0..b29498b92b4 100644
--- a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
+++ b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
@@ -28,12 +28,14 @@ import 
org.apache.kafka.common.protocol.ApiKeys.{BROKER_HEARTBEAT, BROKER_REGIST
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.LogContext
 import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.ControllerInformation
 
-import java.util.Properties
+import java.util.{Optional, Properties}
 import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
+import java.util.function.Supplier
 import scala.jdk.CollectionConverters._
 
-class SimpleControllerNodeProvider extends ControllerNodeProvider {
+class SimpleControllerNodeProvider extends Supplier[ControllerInformation] {
   val node = new AtomicReference[Node](null)
 
   def listenerName: ListenerName = new ListenerName("PLAINTEXT")
@@ -42,7 +44,7 @@ class SimpleControllerNodeProvider extends 
ControllerNodeProvider {
 
   def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM
 
-  override def getControllerInfo(): ControllerInformation = 
ControllerInformation(Option(node.get()),
+  override def get(): ControllerInformation = new 
ControllerInformation(Optional.ofNullable(node.get()),
     listenerName, securityProtocol, saslMechanism)
 }
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
index 159913bf1ca..2c2510cb76b 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
@@ -26,7 +26,7 @@ public interface NodeToControllerChannelManager {
 
     void start();
 
-    void shutdown();
+    void shutdown() throws InterruptedException;
 
     Optional<NodeApiVersions> controllerApiVersions();
 
diff --git 
a/server/src/main/java/org/apache/kafka/server/ControllerInformation.java 
b/server/src/main/java/org/apache/kafka/server/ControllerInformation.java
new file mode 100644
index 00000000000..93a5517e2d9
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/ControllerInformation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.util.Optional;
+
+/**
+ * Connection information for communicating with the active Kafka controller 
in KRaft mode.
+ * <p>
+ * Contains the controller node endpoint and security configuration needed to 
establish
+ * a network connection. The node may be absent during cluster initialization 
or leader elections.
+ *
+ * @param node The controller node (id, host, port), or empty if controller is 
unknown
+ * @param listenerName The listener to use for controller connections
+ * @param securityProtocol The security protocol (PLAINTEXT, SSL, 
SASL_PLAINTEXT, SASL_SSL)
+ * @param saslMechanism The SASL mechanism for authentication (e.g., "PLAIN", 
"SCRAM-SHA-256")
+ */
+public record ControllerInformation(Optional<Node> node, ListenerName 
listenerName, SecurityProtocol securityProtocol,
+                                    String saslMechanism) {
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
 
b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
new file mode 100644
index 00000000000..c54fb97a65b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.Reconfigurable;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.ChannelBuilders;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.KRaftConfigs;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * Manages a communication channel from a node to the active KRaft controller.
+ * <p>
+ * Creates a network client with the appropriate security configuration and 
uses a background
+ * request thread to queue and send requests asynchronously. Supports dynamic 
reconfiguration
+ * of security settings when the controller listener configuration changes.
+ */
+public class NodeToControllerChannelManagerImpl implements 
NodeToControllerChannelManager {
+    private static final Logger log = 
LoggerFactory.getLogger(NodeToControllerChannelManagerImpl.class);
+    private final Time time;
+    private final Metrics metrics;
+    private final AbstractKafkaConfig config;
+    private final String channelName;
+    private final Long retryTimeoutMs;
+
+    private final LogContext logContext;
+    private final ManualMetadataUpdater manualMetadataUpdater = new 
ManualMetadataUpdater();
+    private final ApiVersions apiVersions = new ApiVersions();
+    private final NodeToControllerRequestThread requestThread;
+
+    public NodeToControllerChannelManagerImpl(Supplier<ControllerInformation> 
controllerNodeProvider, Time time, Metrics metrics, AbstractKafkaConfig config, 
String channelName, String threadNamePrefix, Long retryTimeoutMs) {
+        this.time = time;
+        this.metrics = metrics;
+        this.config = config;
+        this.channelName = channelName;
+        this.retryTimeoutMs = retryTimeoutMs;
+        this.logContext = new 
LogContext(String.format("[NodeToControllerChannelManager id=%s name=%s] ",
+                config.getInt(KRaftConfigs.NODE_ID_CONFIG), channelName));
+        String threadName = 
String.format("%sto-controller-%s-channel-manager", threadNamePrefix, 
channelName);
+        ControllerInformation controllerInformation = 
controllerNodeProvider.get();
+        this.requestThread = new NodeToControllerRequestThread(
+                buildNetworkClient(controllerInformation),
+                manualMetadataUpdater,
+                controllerNodeProvider,
+                config,
+                time,
+                threadName,
+                retryTimeoutMs
+        );
+    }
+
+    private KafkaClient buildNetworkClient(ControllerInformation 
controllerInfo) {
+        ChannelBuilder channelBuilder = ChannelBuilders.clientChannelBuilder(
+                controllerInfo.securityProtocol(),
+                JaasContext.Type.SERVER,
+                config,
+                controllerInfo.listenerName(),
+                controllerInfo.saslMechanism(),
+                time,
+                logContext
+        );
+        if (channelBuilder instanceof Reconfigurable reconfigurable) {
+            config.addReconfigurable(reconfigurable);
+        }
+        Selector selector = new Selector(
+                NetworkReceive.UNLIMITED,
+                Selector.NO_IDLE_TIMEOUT_MS,
+                metrics,
+                time,
+                channelName,
+                Map.of("BrokerId", String.valueOf(config.brokerId())),
+                false,
+                channelBuilder,
+                logContext
+        );
+        return new NetworkClient(
+                selector,
+                manualMetadataUpdater,
+                String.valueOf(config.brokerId()),
+                1,
+                50,
+                50,
+                Selectable.USE_DEFAULT_BUFFER_SIZE,
+                Selectable.USE_DEFAULT_BUFFER_SIZE,
+                Math.min(Integer.MAX_VALUE, (int) 
Math.min(config.getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG), 
retryTimeoutMs)), // request timeout should not exceed the provided retry 
timeout
+                
config.getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
+                
config.getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
+                time,
+                true,
+                apiVersions,
+                logContext,
+                MetadataRecoveryStrategy.NONE
+        );
+    }
+
+    @Override
+    public void start() {
+        requestThread.start();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        requestThread.shutdown();
+        log.info("Node to controller channel manager for {} shutdown", 
channelName);
+    }
+
+    @Override
+    public Optional<NodeApiVersions> controllerApiVersions() {
+        return 
requestThread.activeControllerAddress().flatMap(activeController ->
+                
Optional.ofNullable(apiVersions.get(activeController.idString()))
+        );
+    }
+
+    /**
+     * Send request to the controller.
+     *
+     * @param request  The request to be sent.
+     * @param callback Request completion callback.
+     */
+    @Override
+    public void sendRequest(AbstractRequest.Builder<? extends AbstractRequest> 
request,
+                            ControllerRequestCompletionHandler callback) {
+        requestThread.enqueue(new NodeToControllerQueueItem(
+                time.milliseconds(),
+                request,
+                callback
+        ));
+    }
+
+    @Override
+    public long getTimeoutMs() {
+        return retryTimeoutMs;
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
 b/server/src/main/java/org/apache/kafka/server/NodeToControllerQueueItem.java
similarity index 54%
copy from 
server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
copy to 
server/src/main/java/org/apache/kafka/server/NodeToControllerQueueItem.java
index 159913bf1ca..25b657fbe16 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
+++ 
b/server/src/main/java/org/apache/kafka/server/NodeToControllerQueueItem.java
@@ -15,25 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.server.common;
+package org.apache.kafka.server;
 
-import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
 
-import java.util.Optional;
-
-public interface NodeToControllerChannelManager {
-
-    void start();
-
-    void shutdown();
-
-    Optional<NodeApiVersions> controllerApiVersions();
-
-    void sendRequest(
-        AbstractRequest.Builder<? extends AbstractRequest> request,
-        ControllerRequestCompletionHandler callback
-    );
-
-    long getTimeoutMs();
+/**
+ * Represents a queued request to be sent to the controller.
+ * Used for timeout tracking and asynchronous completion handling.
+ *
+ * @param createdTimeMs timestamp when this request was created, used for 
timeout detection
+ * @param request the request to send to the controller
+ * @param callback handler invoked when the request completes, fails, or times 
out
+ */
+public record NodeToControllerQueueItem(Long createdTimeMs,
+                                        AbstractRequest.Builder<? extends 
AbstractRequest> request,
+                                        ControllerRequestCompletionHandler 
callback) {
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
 
b/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
new file mode 100644
index 00000000000..e3d870999b2
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Background thread that manages to send requests to the active controller.
+ * <p>
+ * Maintains a queue of pending requests and handles automatic retries on 
failures,
+ * controller failover, and timeout detection. Requests are re-queued when the 
controller
+ * changes or connections are lost.
+ */
+public class NodeToControllerRequestThread extends InterBrokerSendThread {
+    private static final Logger log = 
LoggerFactory.getLogger(NodeToControllerRequestThread.class);
+
+    private final LinkedBlockingDeque<NodeToControllerQueueItem> requestQueue 
= new LinkedBlockingDeque<>();
+    private final AtomicReference<Node> activeController = new 
AtomicReference<>(null);
+
+
+    private final Time time;
+    private final long retryTimeoutMs;
+    private final Supplier<ControllerInformation> controllerNodeProvider;
+    private final ManualMetadataUpdater metadataUpdater;
+
+    // Used for testing
+    volatile boolean started = false;
+    public void setStarted(boolean started) {
+        this.started = started;
+    }
+
+    public NodeToControllerRequestThread(KafkaClient initialNetworkClient,
+                                         ManualMetadataUpdater metadataUpdater,
+                                         Supplier<ControllerInformation> 
controllerNodeProvider,
+                                         AbstractConfig config,
+                                         Time time,
+                                         String threadName,
+                                         Long retryTimeoutMs) {
+        super(threadName, initialNetworkClient, Math.min(Integer.MAX_VALUE, 
(int) 
Math.min(config.getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG), 
retryTimeoutMs)), time, false);
+        this.time = time;
+        this.controllerNodeProvider = controllerNodeProvider;
+        this.metadataUpdater = metadataUpdater;
+        this.retryTimeoutMs = retryTimeoutMs;
+    }
+
+    public Optional<Node> activeControllerAddress() {
+        return Optional.ofNullable(activeController.get());
+    }
+
+    private void updateControllerAddress(Node newActiveController) {
+        activeController.set(newActiveController);
+    }
+
+    public void enqueue(NodeToControllerQueueItem request) {
+        if (!started) {
+            throw new IllegalStateException("Cannot enqueue a request if the 
request thread is not running");
+        }
+        requestQueue.add(request);
+        if (activeControllerAddress().isPresent()) {
+            wakeup();
+        }
+    }
+
+    public int queueSize() {
+        return requestQueue.size();
+    }
+
+    @Override
+    public Collection<RequestAndCompletionHandler> generateRequests() {
+        final long currentTimeMs = time.milliseconds();
+        final Iterator<NodeToControllerQueueItem> requestIter = 
requestQueue.iterator();
+        while (requestIter.hasNext()) {
+            var request = requestIter.next();
+            if (currentTimeMs - request.createdTimeMs() >= retryTimeoutMs) {
+                requestIter.remove();
+                request.callback().onTimeout();
+            } else {
+                Optional<Node> controllerAddress = activeControllerAddress();
+                if (controllerAddress.isPresent()) {
+                    requestIter.remove();
+                    return List.of(new RequestAndCompletionHandler(
+                            time.milliseconds(),
+                            controllerAddress.get(),
+                            request.request(),
+                            response -> handleResponse(request, response)
+                    ));
+                }
+            }
+        }
+
+        return List.of();
+    }
+
+    void handleResponse(NodeToControllerQueueItem queueItem, ClientResponse 
response) {
+        log.debug("Request {} received {}", queueItem.request(), response);
+        if (response.authenticationException() != null) {
+            log.error("Request {} failed due to authentication error with 
controller. Disconnecting the " +
+                            "connection to the stale controller {}",
+                    queueItem.request(), 
activeControllerAddress().map(Node::idString).orElse("null"),
+                    response.authenticationException()
+            );
+            maybeDisconnectAndUpdateController();
+            queueItem.callback().onComplete(response);
+        } else if (response.versionMismatch() != null) {
+            log.error("Request {} failed due to unsupported version error", 
queueItem.request(),
+                    response.versionMismatch());
+            queueItem.callback().onComplete(response);
+        } else if (response.wasDisconnected()) {
+            updateControllerAddress(null);
+            requestQueue.addFirst(queueItem);
+        } else if 
(response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+            log.debug("Request {} received NOT_CONTROLLER exception. 
Disconnecting the " +
+                            "connection to the stale controller {}",
+                    queueItem.request(),
+                    
activeControllerAddress().map(Node::idString).orElse("null"));
+            maybeDisconnectAndUpdateController();
+            requestQueue.addFirst(queueItem);
+        } else {
+            queueItem.callback().onComplete(response);
+        }
+    }
+
+    private void maybeDisconnectAndUpdateController() {
+        // just close the controller connection and wait for metadata cache 
update in doWork
+        activeControllerAddress().ifPresent(controllerAddress -> {
+            try {
+                // We don't care if disconnect has an error, just log it and 
get a new network client
+                networkClient.disconnect(controllerAddress.idString());
+            } catch (Throwable t) {
+                log.error("Had an error while disconnecting from 
NetworkClient.", t);
+            }
+            updateControllerAddress(null);
+        });
+    }
+
+    @Override
+    public void doWork() {
+        if (activeControllerAddress().isPresent()) {
+            super.pollOnce(Long.MAX_VALUE);
+        } else {
+            log.debug("Controller isn't cached, looking for local metadata 
changes");
+            final ControllerInformation controllerInformation = 
controllerNodeProvider.get();
+            Optional<Node> nodeOptional = controllerInformation.node();
+            if (nodeOptional.isPresent()) {
+                Node controllerNode = nodeOptional.get();
+                log.info("Recorded new KRaft controller, from now on will use 
node {}", controllerNode);
+                updateControllerAddress(controllerNode);
+                metadataUpdater.setNodes(List.of(controllerNode));
+            } else {
+                // need to backoff to avoid tight loops
+                log.debug("No controller provided, retrying after backoff");
+                super.pollOnce(100);
+            }
+        }
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        started = true;
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/RaftControllerNodeProvider.java 
b/server/src/main/java/org/apache/kafka/server/RaftControllerNodeProvider.java
new file mode 100644
index 00000000000..6d4f2313501
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/RaftControllerNodeProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.raft.KRaftConfigs;
+import org.apache.kafka.raft.RaftManager;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.function.Supplier;
+
+/**
+ * Finds the controller node by checking the metadata log manager.
+ * This provider is used when we are using a Raft-based metadata quorum.
+ */
+public class RaftControllerNodeProvider implements 
Supplier<ControllerInformation> {
+
+    private final RaftManager<ApiMessageAndVersion> raftManager;
+    private final ListenerName listenerName;
+    private final SecurityProtocol securityProtocol;
+    private final String saslMechanism;
+
+    public static RaftControllerNodeProvider 
create(RaftManager<ApiMessageAndVersion> raftManager, AbstractKafkaConfig 
config) {
+        final ListenerName controllerListenerName = new 
ListenerName(config.controllerListenerNames().get(0));
+        final SecurityProtocol controllerSecurityProtocol = 
Optional.ofNullable(config.effectiveListenerSecurityProtocolMap().get(controllerListenerName))
+                .orElseGet(() -> 
SecurityProtocol.forName(controllerListenerName.value()));
+        final String controllerSaslMechanism = 
config.getString(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG);
+        return new RaftControllerNodeProvider(raftManager, 
controllerListenerName, controllerSecurityProtocol, controllerSaslMechanism);
+    }
+
+    public RaftControllerNodeProvider(RaftManager<ApiMessageAndVersion> 
raftManager, ListenerName listenerName, SecurityProtocol securityProtocol, 
String saslMechanism) {
+        this.raftManager = raftManager;
+        this.listenerName = listenerName;
+        this.securityProtocol = securityProtocol;
+        this.saslMechanism = saslMechanism;
+    }
+
+    @SuppressWarnings("resource")
+    @Override
+    public ControllerInformation get() {
+        OptionalInt leaderIdOpt = 
raftManager.client().leaderAndEpoch().leaderId();
+
+        Optional<Node> node = leaderIdOpt.isPresent()
+                ? raftManager.client().voterNode(leaderIdOpt.getAsInt(), 
listenerName)
+                : Optional.empty();
+
+        return new ControllerInformation(node, listenerName, securityProtocol, 
saslMechanism);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 7b77e2962cf..f0d25f3ca42 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.server.config;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Reconfigurable;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
@@ -207,4 +208,24 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
         }
         return connectionString.substring(0, 
firstColon).toUpperCase(Locale.ROOT);
     }
+
+    /**
+     * Registers a component for dynamic reconfiguration notifications.
+     * <p>
+     * This method exists to support migration from kafka.server.KafkaConfig 
(Scala/core) to AbstractKafkaConfig (Java/server).
+     * When migrating code, replace KafkaConfig references with 
AbstractKafkaConfig.
+     *
+     * @param reconfigurable the component to register for configuration 
updates
+     */
+    public abstract void addReconfigurable(Reconfigurable reconfigurable);
+
+    /**
+     * Unregisters a component from dynamic reconfiguration notifications.
+     * <p>
+     * This method exists to support migration from kafka.server.KafkaConfig 
(Scala/core) to AbstractKafkaConfig (Java/server).
+     * When migrating code, replace KafkaConfig references with 
AbstractKafkaConfig.
+     *
+     * @param reconfigurable the component to unregister
+     */
+    public abstract void removeReconfigurable(Reconfigurable reconfigurable);
 }
diff --git 
a/server/src/test/java/org/apache/kafka/server/BrokerRegistrationRequestTest.java
 
b/server/src/test/java/org/apache/kafka/server/BrokerRegistrationRequestTest.java
index 827bb60fbe8..3bb6a2199b4 100644
--- 
a/server/src/test/java/org/apache/kafka/server/BrokerRegistrationRequestTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/BrokerRegistrationRequestTest.java
@@ -16,10 +16,8 @@
  */
 package org.apache.kafka.server;
 
-import kafka.server.ControllerInformation;
-import kafka.server.ControllerNodeProvider;
+
 import kafka.server.ControllerServer;
-import kafka.server.NodeToControllerChannelManagerImpl;
 
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.common.Node;
@@ -48,8 +46,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
-import scala.Option;
+import java.util.function.Supplier;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -125,7 +122,7 @@ class BrokerRegistrationRequestTest {
         }
 
         @Override
-        public void close() {
+        public void close() throws InterruptedException {
             channelManager.shutdown();
             metrics.close();
         }
@@ -213,7 +210,7 @@ class BrokerRegistrationRequestTest {
     }
 
     record TestControllerNodeProvider(ClusterInstance clusterInstance)
-            implements ControllerNodeProvider {
+            implements Supplier<ControllerInformation> {
 
         public Optional<Node> node() {
             return Optional.of(new Node(
@@ -236,9 +233,9 @@ class BrokerRegistrationRequestTest {
         }
 
         @Override
-        public ControllerInformation getControllerInfo() {
-            return ControllerInformation.apply(
-                    Option.apply(node().orElse(null)),
+        public ControllerInformation get() {
+            return new ControllerInformation(
+                    node(),
                     listenerName(),
                     securityProtocol(),
                     saslMechanism()
diff --git 
a/server/src/test/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManagerTest.java
 
b/server/src/test/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManagerTest.java
index 86abcdcb6c2..91b2dc586af 100644
--- 
a/server/src/test/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManagerTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManagerTest.java
@@ -90,7 +90,17 @@ public class AddPartitionsToTxnManagerTest {
                 KRaftConfigs.NODE_ID_CONFIG, "1",
                 KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER"),
             Map.of(),
-            false) { };
+            false) {
+        @Override
+        public void addReconfigurable(org.apache.kafka.common.Reconfigurable 
reconfigurable) {
+            // No-op for test
+        }
+
+        @Override
+        public void 
removeReconfigurable(org.apache.kafka.common.Reconfigurable reconfigurable) {
+            // No-op for test
+        }
+    };
     private final AddPartitionsToTxnManager addPartitionsToTxnManager =
             new AddPartitionsToTxnManager(config, networkClient, 
metadataCache, partitionFor, time);
 

Reply via email to