This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 148be487859 KAFKA-20012 Move NodeToControllerChannelManagerImpl to
server module (#21261)
148be487859 is described below
commit 148be4878594ff145c0fe753815012fb6a43dff0
Author: Chih-Yuan Chien <[email protected]>
AuthorDate: Thu Jan 15 23:34:22 2026 +0800
KAFKA-20012 Move NodeToControllerChannelManagerImpl to server module
(#21261)
Migrates `NodeToControllerChannelManager` and related classes from Scala
(core module) to Java (server module).
New Java Classes (server module) include `ControllerInformation`,
`RaftControllerNodeProvider`, `NodeToControllerChannelManagerImpl`,
`NodeToControllerRequestThread` and `NodeToControllerQueueItem`.
In `AbstractKafkaConfig`, added
`addReconfigurable()`/`removeReconfigurable()` abstract methods to
support migration. In `KafkaConfig`, added `override` keywords for
reconfigurable methods. Updated imports and usages across core module
classes and tests. Replace `ControllerNodeProvider` by
`Supplier<ControllerInformation>`.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../scala/kafka/server/AlterPartitionManager.scala | 20 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 10 +-
.../main/scala/kafka/server/ControllerServer.scala | 4 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 4 +-
.../server/NodeToControllerChannelManager.scala | 342 ---------------------
.../server/NodeToControllerRequestThreadTest.scala | 107 ++++---
.../unit/kafka/server/ForwardingManagerTest.scala | 24 +-
.../MockNodeToControllerChannelManager.scala | 17 +-
.../kafka/server/RegistrationTestContext.scala | 8 +-
.../common/NodeToControllerChannelManager.java | 2 +-
.../apache/kafka/server/ControllerInformation.java | 39 +++
.../server/NodeToControllerChannelManagerImpl.java | 174 +++++++++++
.../kafka/server/NodeToControllerQueueItem.java | 31 +-
.../server/NodeToControllerRequestThread.java | 197 ++++++++++++
.../kafka/server/RaftControllerNodeProvider.java | 69 +++++
.../kafka/server/config/AbstractKafkaConfig.java | 21 ++
.../server/BrokerRegistrationRequestTest.java | 17 +-
.../transaction/AddPartitionsToTxnManagerTest.java | 12 +-
18 files changed, 639 insertions(+), 459 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index 9f5052349c8..3b060a35d8f 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -29,7 +29,9 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.util.Scheduler
+import org.apache.kafka.server.{ControllerInformation,
NodeToControllerChannelManagerImpl}
+import java.util.function.Supplier
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.jdk.OptionConverters.RichOptional
@@ -66,7 +68,7 @@ object AlterPartitionManager {
def apply(
config: KafkaConfig,
scheduler: Scheduler,
- controllerNodeProvider: ControllerNodeProvider,
+ controllerNodeProvider: Supplier[ControllerInformation],
time: Time,
metrics: Metrics,
threadNamePrefix: String,
@@ -74,12 +76,12 @@ object AlterPartitionManager {
): AlterPartitionManager = {
val channelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
- time = time,
- metrics = metrics,
- config = config,
- channelName = "alter-partition",
- threadNamePrefix = threadNamePrefix,
- retryTimeoutMs = Long.MaxValue
+ time,
+ metrics,
+ config,
+ "alter-partition",
+ threadNamePrefix,
+ Long.MaxValue
)
new DefaultAlterPartitionManager(
controllerChannelManager = channelManager,
@@ -150,7 +152,7 @@ class DefaultAlterPartitionManager(
val request = buildRequest(inflightAlterPartitionItems, brokerEpoch)
debug(s"Sending AlterPartition to controller $request")
- // We will not timeout AlterPartition request, instead letting it retry
indefinitely
+ // We will not time out AlterPartition request, instead letting it retry
indefinitely
// until a response is received, or a new LeaderAndIsr overwrites the
existing isrState
// which causes the response for those partitions to be ignored.
controllerChannelManager.sendRequest(request,
@@ -159,7 +161,7 @@ class DefaultAlterPartitionManager(
debug(s"Received AlterPartition response $response")
val error = try {
if (response.authenticationException != null) {
- // For now we treat authentication errors as retriable. We use
the
+ // For now, we treat authentication errors as retriable. We use
the
// `NETWORK_EXCEPTION` error code for lack of a good alternative.
// Note that `NodeToControllerChannelManager` will still log the
// authentication errors so that users have a chance to fix the
problem.
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 2d21ee59eff..67ea88b37fb 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -59,6 +59,8 @@ import org.apache.kafka.server.{AssignmentsManager,
BrokerFeatures, ClientMetric
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
+import org.apache.kafka.server.NodeToControllerChannelManagerImpl
+import org.apache.kafka.server.RaftControllerNodeProvider
import java.time.Duration
import java.util
@@ -233,16 +235,16 @@ class BrokerServer(
"controller quorum voters future",
sharedServer.controllerQuorumVotersFuture,
startupDeadline, time)
- val controllerNodeProvider = RaftControllerNodeProvider(raftManager,
config)
+ val controllerNodeProvider =
RaftControllerNodeProvider.create(raftManager, config)
clientToControllerChannelManager = new
NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
config,
- channelName = "forwarding",
+ "forwarding",
s"broker-${config.nodeId}-",
- retryTimeoutMs = 60000
+ 60000
)
clientToControllerChannelManager.start()
forwardingManager = new
ForwardingManagerImpl(clientToControllerChannelManager, metrics)
@@ -316,7 +318,7 @@ class BrokerServer(
config,
"directory-assignments",
s"broker-${config.nodeId}-",
- retryTimeoutMs = 60000
+ 60000
)
assignmentsManager = new AssignmentsManager(
time,
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 81af28dc495..966ade4403d 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -51,6 +51,8 @@ import org.apache.kafka.server.metrics.{KafkaMetricsGroup,
KafkaYammerMetrics, L
import org.apache.kafka.server.network.{EndpointReadyFutures,
KafkaAuthorizerServerInfo}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.util.{Deadline, FutureUtils}
+import org.apache.kafka.server.NodeToControllerChannelManagerImpl
+import org.apache.kafka.server.RaftControllerNodeProvider
import java.util
import java.util.{Optional, OptionalLong}
@@ -405,7 +407,7 @@ class ControllerServer(
/**
* Start the KIP-919 controller registration manager.
*/
- val controllerNodeProvider = RaftControllerNodeProvider(raftManager,
config)
+ val controllerNodeProvider =
RaftControllerNodeProvider.create(raftManager, config)
registrationChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 706a359afed..2092d36d7f9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -420,11 +420,11 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
val unstableApiVersionsEnabled =
getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG)
val unstableFeatureVersionsEnabled =
getBoolean(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG)
- def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
+ override def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)
}
- def removeReconfigurable(reconfigurable: Reconfigurable): Unit = {
+ override def removeReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.removeReconfigurable(reconfigurable)
}
diff --git
a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
deleted file mode 100644
index 0caa03ec052..00000000000
--- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.utils.Logging
-import org.apache.kafka.clients._
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network._
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.AbstractRequest
-import org.apache.kafka.common.security.JaasContext
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.common.{Node, Reconfigurable}
-import org.apache.kafka.raft.RaftManager
-import org.apache.kafka.server.common.{ApiMessageAndVersion,
ControllerRequestCompletionHandler, NodeToControllerChannelManager}
-import org.apache.kafka.server.util.{InterBrokerSendThread,
RequestAndCompletionHandler}
-
-import java.util
-import java.util.Optional
-import java.util.concurrent.LinkedBlockingDeque
-import java.util.concurrent.atomic.AtomicReference
-import scala.collection.Seq
-import scala.jdk.CollectionConverters._
-import scala.jdk.OptionConverters.{RichOption, RichOptional, RichOptionalInt}
-
-case class ControllerInformation(
- node: Option[Node],
- listenerName: ListenerName,
- securityProtocol: SecurityProtocol,
- saslMechanism: String
-)
-
-trait ControllerNodeProvider {
- def getControllerInfo(): ControllerInformation
-}
-
-object RaftControllerNodeProvider {
- def apply(
- raftManager: RaftManager[ApiMessageAndVersion],
- config: KafkaConfig,
- ): RaftControllerNodeProvider = {
- val controllerListenerName = new
ListenerName(config.controllerListenerNames.get(0))
- val controllerSecurityProtocol =
Option(config.effectiveListenerSecurityProtocolMap.get(controllerListenerName))
- .getOrElse(SecurityProtocol.forName(controllerListenerName.value()))
- val controllerSaslMechanism = config.saslMechanismControllerProtocol
- new RaftControllerNodeProvider(
- raftManager,
- controllerListenerName,
- controllerSecurityProtocol,
- controllerSaslMechanism
- )
- }
-}
-
-/**
- * Finds the controller node by checking the metadata log manager.
- * This provider is used when we are using a Raft-based metadata quorum.
- */
-class RaftControllerNodeProvider(
- val raftManager: RaftManager[ApiMessageAndVersion],
- val listenerName: ListenerName,
- val securityProtocol: SecurityProtocol,
- val saslMechanism: String
-) extends ControllerNodeProvider with Logging {
-
- private def idToNode(id: Int): Option[Node] =
raftManager.client.voterNode(id, listenerName).toScala
-
- override def getControllerInfo(): ControllerInformation =
-
ControllerInformation(raftManager.client.leaderAndEpoch.leaderId.toScala.flatMap(idToNode),
- listenerName, securityProtocol, saslMechanism)
-}
-
-/**
- * This class manages the connection between a broker and the controller. It
runs a single
- * [[NodeToControllerRequestThread]] which uses the broker's metadata cache as
its own metadata to find
- * and connect to the controller. The channel is async and runs the network
connection in the background.
- * The maximum number of in-flight requests are set to one to ensure orderly
response from the controller, therefore
- * care must be taken to not block on outstanding requests for too long.
- */
-class NodeToControllerChannelManagerImpl(
- controllerNodeProvider: ControllerNodeProvider,
- time: Time,
- metrics: Metrics,
- config: KafkaConfig,
- channelName: String,
- threadNamePrefix: String,
- retryTimeoutMs: Long
-) extends NodeToControllerChannelManager with Logging {
- private val logContext = new LogContext(s"[NodeToControllerChannelManager
id=${config.nodeId} name=${channelName}] ")
- private val manualMetadataUpdater = new ManualMetadataUpdater()
- private val apiVersions = new ApiVersions()
- private val requestThread = newRequestThread
-
- def start(): Unit = {
- requestThread.start()
- }
-
- def shutdown(): Unit = {
- requestThread.shutdown()
- info(s"Node to controller channel manager for $channelName shutdown")
- }
-
- private[server] def newRequestThread = {
- def buildNetworkClient(controllerInfo: ControllerInformation) = {
- val channelBuilder = ChannelBuilders.clientChannelBuilder(
- controllerInfo.securityProtocol,
- JaasContext.Type.SERVER,
- config,
- controllerInfo.listenerName,
- controllerInfo.saslMechanism,
- time,
- logContext
- )
- channelBuilder match {
- case reconfigurable: Reconfigurable =>
config.addReconfigurable(reconfigurable)
- case _ =>
- }
- val selector = new Selector(
- NetworkReceive.UNLIMITED,
- Selector.NO_IDLE_TIMEOUT_MS,
- metrics,
- time,
- channelName,
- Map("BrokerId" -> config.brokerId.toString).asJava,
- false,
- channelBuilder,
- logContext
- )
- new NetworkClient(
- selector,
- manualMetadataUpdater,
- config.brokerId.toString,
- 1,
- 50,
- 50,
- Selectable.USE_DEFAULT_BUFFER_SIZE,
- Selectable.USE_DEFAULT_BUFFER_SIZE,
- Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs,
retryTimeoutMs)).toInt, // request timeout should not exceed the provided retry
timeout
- config.connectionSetupTimeoutMs,
- config.connectionSetupTimeoutMaxMs,
- time,
- true,
- apiVersions,
- logContext,
- MetadataRecoveryStrategy.NONE
- )
- }
- val threadName =
s"${threadNamePrefix}to-controller-${channelName}-channel-manager"
-
- val controllerInformation = controllerNodeProvider.getControllerInfo()
- new NodeToControllerRequestThread(
- buildNetworkClient(controllerInformation),
- manualMetadataUpdater,
- controllerNodeProvider,
- config,
- time,
- threadName,
- retryTimeoutMs
- )
- }
-
- /**
- * Send request to the controller.
- *
- * @param request The request to be sent.
- * @param callback Request completion callback.
- */
- def sendRequest(
- request: AbstractRequest.Builder[_ <: AbstractRequest],
- callback: ControllerRequestCompletionHandler
- ): Unit = {
- requestThread.enqueue(NodeToControllerQueueItem(
- time.milliseconds(),
- request,
- callback
- ))
- }
-
- def controllerApiVersions(): Optional[NodeApiVersions] = {
- requestThread.activeControllerAddress().flatMap { activeController =>
- Option(apiVersions.get(activeController.idString))
- }.toJava
- }
-
- def getTimeoutMs: Long = retryTimeoutMs
-}
-
-case class NodeToControllerQueueItem(
- createdTimeMs: Long,
- request: AbstractRequest.Builder[_ <: AbstractRequest],
- callback: ControllerRequestCompletionHandler
-)
-
-class NodeToControllerRequestThread(
- initialNetworkClient: KafkaClient,
- metadataUpdater: ManualMetadataUpdater,
- controllerNodeProvider: ControllerNodeProvider,
- config: KafkaConfig,
- time: Time,
- threadName: String,
- retryTimeoutMs: Long
-) extends InterBrokerSendThread(
- threadName,
- initialNetworkClient,
- Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs,
retryTimeoutMs)).toInt,
- time,
- false
-) with Logging {
-
- this.logIdent = logPrefix
-
- private val requestQueue = new
LinkedBlockingDeque[NodeToControllerQueueItem]()
- private val activeController = new AtomicReference[Node](null)
-
- // Used for testing
- @volatile
- private[server] var started = false
-
- def activeControllerAddress(): Option[Node] = {
- Option(activeController.get())
- }
-
- private def updateControllerAddress(newActiveController: Node): Unit = {
- activeController.set(newActiveController)
- }
-
- def enqueue(request: NodeToControllerQueueItem): Unit = {
- if (!started) {
- throw new IllegalStateException("Cannot enqueue a request if the request
thread is not running")
- }
- requestQueue.add(request)
- if (activeControllerAddress().isDefined) {
- wakeup()
- }
- }
-
- def queueSize: Int = {
- requestQueue.size
- }
-
- override def generateRequests():
util.Collection[RequestAndCompletionHandler] = {
- val currentTimeMs = time.milliseconds()
- val requestIter = requestQueue.iterator()
- while (requestIter.hasNext) {
- val request = requestIter.next
- if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) {
- requestIter.remove()
- request.callback.onTimeout()
- } else {
- val controllerAddress = activeControllerAddress()
- if (controllerAddress.isDefined) {
- requestIter.remove()
- return util.Collections.singletonList(new
RequestAndCompletionHandler(
- time.milliseconds(),
- controllerAddress.get,
- request.request,
- response => handleResponse(request)(response)
- ))
- }
- }
- }
- util.Collections.emptyList()
- }
-
- private[server] def handleResponse(queueItem:
NodeToControllerQueueItem)(response: ClientResponse): Unit = {
- debug(s"Request ${queueItem.request} received $response")
- if (response.authenticationException != null) {
- error(s"Request ${queueItem.request} failed due to authentication error
with controller. Disconnecting the " +
- s"connection to the stale controller
${activeControllerAddress().map(_.idString).getOrElse("null")}",
- response.authenticationException)
- maybeDisconnectAndUpdateController()
- queueItem.callback.onComplete(response)
- } else if (response.versionMismatch != null) {
- error(s"Request ${queueItem.request} failed due to unsupported version
error",
- response.versionMismatch)
- queueItem.callback.onComplete(response)
- } else if (response.wasDisconnected()) {
- updateControllerAddress(null)
- requestQueue.putFirst(queueItem)
- } else if
(response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
- debug(s"Request ${queueItem.request} received NOT_CONTROLLER exception.
Disconnecting the " +
- s"connection to the stale controller
${activeControllerAddress().map(_.idString).getOrElse("null")}")
- maybeDisconnectAndUpdateController()
- requestQueue.putFirst(queueItem)
- } else {
- queueItem.callback.onComplete(response)
- }
- }
-
- private def maybeDisconnectAndUpdateController(): Unit = {
- // just close the controller connection and wait for metadata cache update
in doWork
- activeControllerAddress().foreach { controllerAddress =>
- try {
- // We don't care if disconnect has an error, just log it and get a new
network client
- networkClient.disconnect(controllerAddress.idString)
- } catch {
- case t: Throwable => error("Had an error while disconnecting from
NetworkClient.", t)
- }
- updateControllerAddress(null)
- }
- }
-
- override def doWork(): Unit = {
- val controllerInformation = controllerNodeProvider.getControllerInfo()
- if (activeControllerAddress().isDefined) {
- super.pollOnce(Long.MaxValue)
- } else {
- debug("Controller isn't cached, looking for local metadata changes")
- controllerInformation.node match {
- case Some(controllerNode) =>
- info(s"Recorded new KRaft controller, from now on will use node
$controllerNode")
- updateControllerAddress(controllerNode)
- metadataUpdater.setNodes(Seq(controllerNode).asJava)
- case None =>
- // need to backoff to avoid tight loops
- debug("No controller provided, retrying after backoff")
- super.pollOnce(100)
- }
- }
- }
-
- override def start(): Unit = {
- super.start()
- started = true
- }
-}
diff --git
a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
index 46c7237dafb..a90f9034d02 100644
--- a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
@@ -31,15 +31,22 @@ import
org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.server.common.ControllerRequestCompletionHandler
+import org.apache.kafka.server.ControllerInformation
+import org.apache.kafka.server.NodeToControllerRequestThread
+import org.apache.kafka.server.NodeToControllerQueueItem
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito._
+import java.util.Optional
+import java.util.function.Supplier
+import scala.jdk.OptionConverters.RichOption
+
class NodeToControllerRequestThreadTest {
private def controllerInfo(node: Option[Node]): ControllerInformation = {
- ControllerInformation(node, new ListenerName(""),
SecurityProtocol.PLAINTEXT, "")
+ new ControllerInformation(node.toJava, new ListenerName(""),
SecurityProtocol.PLAINTEXT, "")
}
private def emptyControllerInfo: ControllerInformation = {
@@ -52,18 +59,18 @@ class NodeToControllerRequestThreadTest {
val config = new KafkaConfig(TestUtils.createBrokerConfig(1))
val metadata = mock(classOf[Metadata])
val mockClient = new MockClient(time, metadata)
- val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+ val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
-
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
+ when(controllerNodeProvider.get()).thenReturn(emptyControllerInfo)
val retryTimeoutMs = 30000
val testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs)
- testRequestThread.started = true
+ testRequestThread.setStarted(true)
val completionHandler = new TestControllerRequestCompletionHandler(None)
- val queueItem = NodeToControllerQueueItem(
+ val queueItem = new NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
completionHandler
@@ -89,20 +96,20 @@ class NodeToControllerRequestThreadTest {
val metadata = mock(classOf[Metadata])
val mockClient = new MockClient(time, metadata)
- val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+ val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
val activeController = new Node(controllerId, "host", 1234)
-
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
+
when(controllerNodeProvider.get()).thenReturn(controllerInfo(Some(activeController)))
val expectedResponse = RequestTestUtils.metadataUpdateWith(2,
java.util.Map.of("a", 2))
val testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
- testRequestThread.started = true
+ controllerNodeProvider, config, time, "", Long.MaxValue)
+ testRequestThread.setStarted(true)
mockClient.prepareResponse(expectedResponse)
val completionHandler = new
TestControllerRequestCompletionHandler(Some(expectedResponse))
- val queueItem = NodeToControllerQueueItem(
+ val queueItem = new NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
completionHandler
@@ -131,21 +138,21 @@ class NodeToControllerRequestThreadTest {
val metadata = mock(classOf[Metadata])
val mockClient = new MockClient(time, metadata)
- val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+ val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
val oldController = new Node(oldControllerId, "host1", 1234)
val newController = new Node(newControllerId, "host2", 1234)
- when(controllerNodeProvider.getControllerInfo()).thenReturn(
+ when(controllerNodeProvider.get()).thenReturn(
controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
val expectedResponse = RequestTestUtils.metadataUpdateWith(3,
java.util.Map.of("a", 2))
val testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
- testRequestThread.started = true
+ controllerNodeProvider, config, time, "", Long.MaxValue)
+ testRequestThread.setStarted(true)
val completionHandler = new
TestControllerRequestCompletionHandler(Some(expectedResponse))
- val queueItem = NodeToControllerQueueItem(
+ val queueItem = new NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
completionHandler,
@@ -179,12 +186,12 @@ class NodeToControllerRequestThreadTest {
val metadata = mock(classOf[Metadata])
val mockClient = new MockClient(time, metadata)
- val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+ val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
val port = 1234
val oldController = new Node(oldControllerId, "host1", port)
val newController = new Node(newControllerId, "host2", port)
- when(controllerNodeProvider.getControllerInfo()).thenReturn(
+ when(controllerNodeProvider.get()).thenReturn(
controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
val responseWithNotControllerError =
RequestTestUtils.metadataUpdateWith("cluster1", 2,
@@ -193,11 +200,11 @@ class NodeToControllerRequestThreadTest {
val expectedResponse = RequestTestUtils.metadataUpdateWith(3,
java.util.Map.of("a", 2))
val testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
- testRequestThread.started = true
+ controllerNodeProvider, config, time, "", Long.MaxValue)
+ testRequestThread.setStarted(true)
val completionHandler = new
TestControllerRequestCompletionHandler(Some(expectedResponse))
- val queueItem = NodeToControllerQueueItem(
+ val queueItem = new NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()
.setAllowAutoTopicCreation(true)),
@@ -208,7 +215,7 @@ class NodeToControllerRequestThreadTest {
testRequestThread.doWork()
val oldBrokerNode = new Node(oldControllerId, "host1", port)
- assertEquals(Some(oldBrokerNode),
testRequestThread.activeControllerAddress())
+ assertEquals(Some(oldBrokerNode).toJava,
testRequestThread.activeControllerAddress())
// send and process the request
mockClient.prepareResponse((body: AbstractRequest) => {
@@ -216,7 +223,7 @@ class NodeToControllerRequestThreadTest {
body.asInstanceOf[MetadataRequest].allowAutoTopicCreation()
}, responseWithNotControllerError)
testRequestThread.doWork()
- assertEquals(None, testRequestThread.activeControllerAddress())
+ assertEquals(Optional.empty(), testRequestThread.activeControllerAddress())
// reinitialize the controller to a different node
testRequestThread.doWork()
// process the request again
@@ -224,7 +231,7 @@ class NodeToControllerRequestThreadTest {
testRequestThread.doWork()
val newControllerNode = new Node(newControllerId, "host2", port)
- assertEquals(Some(newControllerNode),
testRequestThread.activeControllerAddress())
+ assertEquals(Some(newControllerNode).toJava,
testRequestThread.activeControllerAddress())
assertTrue(completionHandler.completed.get())
}
@@ -241,12 +248,12 @@ class NodeToControllerRequestThreadTest {
// enable envelope API
mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.ENVELOPE.id,
0.toShort, 0.toShort))
- val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+ val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
val port = 1234
val oldController = new Node(oldControllerId, "host1", port)
val newController = new Node(newControllerId, "host2", port)
- when(controllerNodeProvider.getControllerInfo()).thenReturn(
+ when(controllerNodeProvider.get()).thenReturn(
controllerInfo(Some(oldController)),
controllerInfo(Some(newController))
)
@@ -260,8 +267,8 @@ class NodeToControllerRequestThreadTest {
val testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
- testRequestThread.started = true
+ controllerNodeProvider, config, time, "", Long.MaxValue)
+ testRequestThread.setStarted(true)
val completionHandler = new
TestControllerRequestCompletionHandler(Some(expectedResponse))
val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"principal", true)
@@ -271,7 +278,7 @@ class NodeToControllerRequestThreadTest {
val envelopeRequestBuilder = new
EnvelopeRequest.Builder(ByteBuffer.allocate(0),
kafkaPrincipalBuilder.serialize(kafkaPrincipal),
"client-address".getBytes)
- val queueItem = NodeToControllerQueueItem(
+ val queueItem = new NodeToControllerQueueItem(
time.milliseconds(),
envelopeRequestBuilder,
completionHandler
@@ -282,7 +289,7 @@ class NodeToControllerRequestThreadTest {
testRequestThread.doWork()
val oldBrokerNode = new Node(oldControllerId, "host1", port)
- assertEquals(Some(oldBrokerNode),
testRequestThread.activeControllerAddress())
+ assertEquals(Some(oldBrokerNode).toJava,
testRequestThread.activeControllerAddress())
// send and process the envelope request
mockClient.prepareResponse((body: AbstractRequest) => {
@@ -290,7 +297,7 @@ class NodeToControllerRequestThreadTest {
}, envelopeResponseWithNotControllerError)
testRequestThread.doWork()
// expect to reset the activeControllerAddress after finding the
NOT_CONTROLLER error
- assertEquals(None, testRequestThread.activeControllerAddress())
+ assertEquals(Optional.empty(), testRequestThread.activeControllerAddress())
// reinitialize the controller to a different node
testRequestThread.doWork()
// process the request again
@@ -298,7 +305,7 @@ class NodeToControllerRequestThreadTest {
testRequestThread.doWork()
val newControllerNode = new Node(newControllerId, "host2", port)
- assertEquals(Some(newControllerNode),
testRequestThread.activeControllerAddress())
+ assertEquals(Some(newControllerNode).toJava,
testRequestThread.activeControllerAddress())
assertTrue(completionHandler.completed.get())
}
@@ -312,10 +319,10 @@ class NodeToControllerRequestThreadTest {
val metadata = mock(classOf[Metadata])
val mockClient = new MockClient(time, metadata)
- val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+ val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
val controller = new Node(controllerId, "host1", 1234)
-
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(controller)))
+
when(controllerNodeProvider.get()).thenReturn(controllerInfo(Some(controller)))
val retryTimeoutMs = 30000
val responseWithNotControllerError =
RequestTestUtils.metadataUpdateWith("cluster1", 2,
@@ -324,10 +331,10 @@ class NodeToControllerRequestThreadTest {
val testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
controllerNodeProvider, config, time, "", retryTimeoutMs)
- testRequestThread.started = true
+ testRequestThread.setStarted(true)
val completionHandler = new TestControllerRequestCompletionHandler()
- val queueItem = NodeToControllerQueueItem(
+ val queueItem = new NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()
.setAllowAutoTopicCreation(true)),
@@ -361,10 +368,10 @@ class NodeToControllerRequestThreadTest {
val metadata = mock(classOf[Metadata])
val mockClient = new MockClient(time, metadata)
- val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+ val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
val activeController = new Node(controllerId, "host", 1234)
-
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
+
when(controllerNodeProvider.get()).thenReturn(controllerInfo(Some(activeController)))
val callbackResponse = new AtomicReference[ClientResponse]()
val completionHandler = new ControllerRequestCompletionHandler {
@@ -372,7 +379,7 @@ class NodeToControllerRequestThreadTest {
override def onComplete(response: ClientResponse): Unit =
callbackResponse.set(response)
}
- val queueItem = NodeToControllerQueueItem(
+ val queueItem = new NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
completionHandler
@@ -382,8 +389,8 @@ class NodeToControllerRequestThreadTest {
val testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
- testRequestThread.started = true
+ controllerNodeProvider, config, time, "", Long.MaxValue)
+ testRequestThread.setStarted(true)
testRequestThread.enqueue(queueItem)
pollUntil(testRequestThread, () => callbackResponse.get != null)
@@ -399,10 +406,10 @@ class NodeToControllerRequestThreadTest {
val metadata = mock(classOf[Metadata])
val mockClient = new MockClient(time, metadata)
- val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+ val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
val activeController = new Node(controllerId, "host", 1234)
-
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
+
when(controllerNodeProvider.get()).thenReturn(controllerInfo(Some(activeController)))
val callbackResponse = new AtomicReference[ClientResponse]()
val completionHandler = new ControllerRequestCompletionHandler {
@@ -410,7 +417,7 @@ class NodeToControllerRequestThreadTest {
override def onComplete(response: ClientResponse): Unit =
callbackResponse.set(response)
}
- val queueItem = NodeToControllerQueueItem(
+ val queueItem = new NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
completionHandler
@@ -420,13 +427,13 @@ class NodeToControllerRequestThreadTest {
val testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
- testRequestThread.started = true
+ controllerNodeProvider, config, time, "", Long.MaxValue)
+ testRequestThread.setStarted(true)
testRequestThread.enqueue(queueItem)
pollUntil(testRequestThread, () => callbackResponse.get != null)
assertNotNull(callbackResponse.get.authenticationException)
- assertEquals(None, testRequestThread.activeControllerAddress())
+ assertEquals(Optional.empty(), testRequestThread.activeControllerAddress())
}
@Test
@@ -438,15 +445,15 @@ class NodeToControllerRequestThreadTest {
val metadata = mock(classOf[Metadata])
val mockClient = new MockClient(time, metadata)
- val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
-
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
+ val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
+ when(controllerNodeProvider.get()).thenReturn(emptyControllerInfo)
val testRequestThread = new NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
+ controllerNodeProvider, config, time, "", Long.MaxValue)
val completionHandler = new TestControllerRequestCompletionHandler(None)
- val queueItem = NodeToControllerQueueItem(
+ val queueItem = new NodeToControllerQueueItem(
time.milliseconds(),
new MetadataRequest.Builder(new MetadataRequestData()),
completionHandler
diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
index 16e4b2bcb66..46815eaaa4b 100644
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
@@ -37,16 +37,18 @@ import
org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.ControllerInformation
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito
+import java.util.function.Supplier
import scala.jdk.CollectionConverters._
class ForwardingManagerTest {
private val time = new MockTime()
private val client = new MockClient(time)
- private val controllerNodeProvider =
Mockito.mock(classOf[ControllerNodeProvider])
+ private val controllerNodeProvider =
Mockito.mock(classOf[Supplier[ControllerInformation]])
private val brokerToController = new MockNodeToControllerChannelManager(
client, time, controllerNodeProvider, controllerApiVersions)
private val metrics = new Metrics()
@@ -66,11 +68,11 @@ class ForwardingManagerTest {
}
private def controllerInfo = {
- ControllerInformation(Some(new Node(0, "host", 1234)), new
ListenerName(""), SecurityProtocol.PLAINTEXT, "")
+ new ControllerInformation(Optional.of(new Node(0, "host", 1234)), new
ListenerName(""), SecurityProtocol.PLAINTEXT, "")
}
private def emptyControllerInfo = {
- ControllerInformation(None, new ListenerName(""),
SecurityProtocol.PLAINTEXT, "")
+ new ControllerInformation(Optional.empty(), new ListenerName(""),
SecurityProtocol.PLAINTEXT, "")
}
@Test
@@ -84,7 +86,7 @@ class ForwardingManagerTest {
val responseBuffer =
RequestTestUtils.serializeResponseWithHeader(responseBody,
requestHeader.apiVersion,
requestCorrelationId + 1)
-
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+ Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
val isEnvelopeRequest: RequestMatcher = request =>
request.isInstanceOf[EnvelopeRequest]
client.prepareResponse(isEnvelopeRequest, new
EnvelopeResponse(responseBuffer, Errors.NONE))
@@ -108,7 +110,7 @@ class ForwardingManagerTest {
val responseBuffer =
RequestTestUtils.serializeResponseWithHeader(responseBody,
requestHeader.apiVersion, requestCorrelationId)
-
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+ Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
val isEnvelopeRequest: RequestMatcher = request =>
request.isInstanceOf[EnvelopeRequest]
client.prepareResponse(isEnvelopeRequest, new
EnvelopeResponse(responseBuffer, Errors.UNSUPPORTED_VERSION))
@@ -125,7 +127,7 @@ class ForwardingManagerTest {
val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest,
requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
-
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
+ Mockito.when(controllerNodeProvider.get()).thenReturn(emptyControllerInfo)
val response = new AtomicReference[AbstractResponse]()
forwardingManager.forwardRequest(request, res => res.foreach(response.set))
@@ -149,7 +151,7 @@ class ForwardingManagerTest {
val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest,
requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
-
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+ Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
val response = new AtomicReference[AbstractResponse]()
forwardingManager.forwardRequest(request, res => res.foreach(response.set))
@@ -175,7 +177,7 @@ class ForwardingManagerTest {
val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest,
requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
-
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+ Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
val isEnvelopeRequest: RequestMatcher = request =>
request.isInstanceOf[EnvelopeRequest]
client.prepareUnsupportedVersionResponse(isEnvelopeRequest)
@@ -196,7 +198,7 @@ class ForwardingManagerTest {
val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest,
requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
-
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+ Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
client.createPendingAuthenticationError(controllerInfo.node.get, 50)
@@ -220,7 +222,7 @@ class ForwardingManagerTest {
val responseBuffer =
RequestTestUtils.serializeResponseWithHeader(responseBody,
requestHeader.apiVersion, requestCorrelationId)
-
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+ Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
val isEnvelopeRequest: RequestMatcher = request =>
request.isInstanceOf[EnvelopeRequest]
client.prepareResponse(isEnvelopeRequest, new
EnvelopeResponse(responseBuffer, Errors.UNSUPPORTED_VERSION))
@@ -242,7 +244,7 @@ class ForwardingManagerTest {
val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest,
requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
-
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
+ Mockito.when(controllerNodeProvider.get()).thenReturn(controllerInfo)
val response = new AtomicReference[AbstractResponse]()
forwardingManager.forwardRequest(request, res => res.foreach(response.set))
diff --git
a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
index 633a2ffc80f..2a162d64bf3 100644
---
a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
+++
b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
@@ -21,13 +21,16 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.{ControllerInformation,
NodeToControllerQueueItem}
import java.util.Optional
+import java.util.function.Supplier
+import scala.jdk.OptionConverters._
class MockNodeToControllerChannelManager(
val client: MockClient,
time: MockTime,
- controllerNodeProvider: ControllerNodeProvider,
+ controllerNodeProvider: Supplier[ControllerInformation],
controllerApiVersions: NodeApiVersions = NodeApiVersions.create(),
val retryTimeoutMs: Int = 60000,
val requestTimeoutMs: Int = 30000
@@ -44,10 +47,10 @@ class MockNodeToControllerChannelManager(
request: AbstractRequest.Builder[_ <: AbstractRequest],
callback: ControllerRequestCompletionHandler
): Unit = {
- unsentQueue.add(NodeToControllerQueueItem(
- createdTimeMs = time.milliseconds(),
- request = request,
- callback = callback
+ unsentQueue.add(new NodeToControllerQueueItem(
+ time.milliseconds(),
+ request,
+ callback
))
}
@@ -78,7 +81,7 @@ class MockNodeToControllerChannelManager(
queueItem.callback.onTimeout()
unsentIterator.remove()
} else {
- controllerNodeProvider.getControllerInfo().node match {
+ controllerNodeProvider.get().node.toScala match {
case Some(controller) if client.ready(controller,
time.milliseconds()) =>
val clientRequest = client.newClientRequest(
controller.idString,
@@ -86,7 +89,7 @@ class MockNodeToControllerChannelManager(
queueItem.createdTimeMs,
true, // we expect response,
requestTimeoutMs,
- handleResponse(queueItem)
+ handleResponse(queueItem) _
)
client.send(clientRequest, time.milliseconds())
unsentIterator.remove()
diff --git
a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
index dd5968055e0..b29498b92b4 100644
--- a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
+++ b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
@@ -28,12 +28,14 @@ import
org.apache.kafka.common.protocol.ApiKeys.{BROKER_HEARTBEAT, BROKER_REGIST
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.ControllerInformation
-import java.util.Properties
+import java.util.{Optional, Properties}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
+import java.util.function.Supplier
import scala.jdk.CollectionConverters._
-class SimpleControllerNodeProvider extends ControllerNodeProvider {
+class SimpleControllerNodeProvider extends Supplier[ControllerInformation] {
val node = new AtomicReference[Node](null)
def listenerName: ListenerName = new ListenerName("PLAINTEXT")
@@ -42,7 +44,7 @@ class SimpleControllerNodeProvider extends
ControllerNodeProvider {
def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM
- override def getControllerInfo(): ControllerInformation =
ControllerInformation(Option(node.get()),
+ override def get(): ControllerInformation = new
ControllerInformation(Optional.ofNullable(node.get()),
listenerName, securityProtocol, saslMechanism)
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
b/server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
index 159913bf1ca..2c2510cb76b 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
@@ -26,7 +26,7 @@ public interface NodeToControllerChannelManager {
void start();
- void shutdown();
+ void shutdown() throws InterruptedException;
Optional<NodeApiVersions> controllerApiVersions();
diff --git
a/server/src/main/java/org/apache/kafka/server/ControllerInformation.java
b/server/src/main/java/org/apache/kafka/server/ControllerInformation.java
new file mode 100644
index 00000000000..93a5517e2d9
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/ControllerInformation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.util.Optional;
+
+/**
+ * Connection information for communicating with the active Kafka controller
in KRaft mode.
+ * <p>
+ * Contains the controller node endpoint and security configuration needed to
establish
+ * a network connection. The node may be absent during cluster initialization
or leader elections.
+ *
+ * @param node The controller node (id, host, port), or empty if controller is
unknown
+ * @param listenerName The listener to use for controller connections
+ * @param securityProtocol The security protocol (PLAINTEXT, SSL,
SASL_PLAINTEXT, SASL_SSL)
+ * @param saslMechanism The SASL mechanism for authentication (e.g., "PLAIN",
"SCRAM-SHA-256")
+ */
+public record ControllerInformation(Optional<Node> node, ListenerName
listenerName, SecurityProtocol securityProtocol,
+ String saslMechanism) {
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
new file mode 100644
index 00000000000..c54fb97a65b
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.Reconfigurable;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.ChannelBuilders;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.KRaftConfigs;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * Manages a communication channel from a node to the active KRaft controller.
+ * <p>
+ * Creates a network client with the appropriate security configuration and
uses a background
+ * request thread to queue and send requests asynchronously. Supports dynamic
reconfiguration
+ * of security settings when the controller listener configuration changes.
+ */
+public class NodeToControllerChannelManagerImpl implements
NodeToControllerChannelManager {
+ private static final Logger log =
LoggerFactory.getLogger(NodeToControllerChannelManagerImpl.class);
+ private final Time time;
+ private final Metrics metrics;
+ private final AbstractKafkaConfig config;
+ private final String channelName;
+ private final Long retryTimeoutMs;
+
+ private final LogContext logContext;
+ private final ManualMetadataUpdater manualMetadataUpdater = new
ManualMetadataUpdater();
+ private final ApiVersions apiVersions = new ApiVersions();
+ private final NodeToControllerRequestThread requestThread;
+
+ public NodeToControllerChannelManagerImpl(Supplier<ControllerInformation>
controllerNodeProvider, Time time, Metrics metrics, AbstractKafkaConfig config,
String channelName, String threadNamePrefix, Long retryTimeoutMs) {
+ this.time = time;
+ this.metrics = metrics;
+ this.config = config;
+ this.channelName = channelName;
+ this.retryTimeoutMs = retryTimeoutMs;
+ this.logContext = new
LogContext(String.format("[NodeToControllerChannelManager id=%s name=%s] ",
+ config.getInt(KRaftConfigs.NODE_ID_CONFIG), channelName));
+ String threadName =
String.format("%sto-controller-%s-channel-manager", threadNamePrefix,
channelName);
+ ControllerInformation controllerInformation =
controllerNodeProvider.get();
+ this.requestThread = new NodeToControllerRequestThread(
+ buildNetworkClient(controllerInformation),
+ manualMetadataUpdater,
+ controllerNodeProvider,
+ config,
+ time,
+ threadName,
+ retryTimeoutMs
+ );
+ }
+
+ private KafkaClient buildNetworkClient(ControllerInformation
controllerInfo) {
+ ChannelBuilder channelBuilder = ChannelBuilders.clientChannelBuilder(
+ controllerInfo.securityProtocol(),
+ JaasContext.Type.SERVER,
+ config,
+ controllerInfo.listenerName(),
+ controllerInfo.saslMechanism(),
+ time,
+ logContext
+ );
+ if (channelBuilder instanceof Reconfigurable reconfigurable) {
+ config.addReconfigurable(reconfigurable);
+ }
+ Selector selector = new Selector(
+ NetworkReceive.UNLIMITED,
+ Selector.NO_IDLE_TIMEOUT_MS,
+ metrics,
+ time,
+ channelName,
+ Map.of("BrokerId", String.valueOf(config.brokerId())),
+ false,
+ channelBuilder,
+ logContext
+ );
+ return new NetworkClient(
+ selector,
+ manualMetadataUpdater,
+ String.valueOf(config.brokerId()),
+ 1,
+ 50,
+ 50,
+ Selectable.USE_DEFAULT_BUFFER_SIZE,
+ Selectable.USE_DEFAULT_BUFFER_SIZE,
+ Math.min(Integer.MAX_VALUE, (int)
Math.min(config.getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG),
retryTimeoutMs)), // request timeout should not exceed the provided retry
timeout
+
config.getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
+
config.getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
+ time,
+ true,
+ apiVersions,
+ logContext,
+ MetadataRecoveryStrategy.NONE
+ );
+ }
+
+ @Override
+ public void start() {
+ requestThread.start();
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException {
+ requestThread.shutdown();
+ log.info("Node to controller channel manager for {} shutdown",
channelName);
+ }
+
+ @Override
+ public Optional<NodeApiVersions> controllerApiVersions() {
+ return
requestThread.activeControllerAddress().flatMap(activeController ->
+
Optional.ofNullable(apiVersions.get(activeController.idString()))
+ );
+ }
+
+ /**
+ * Send request to the controller.
+ *
+ * @param request The request to be sent.
+ * @param callback Request completion callback.
+ */
+ @Override
+ public void sendRequest(AbstractRequest.Builder<? extends AbstractRequest>
request,
+ ControllerRequestCompletionHandler callback) {
+ requestThread.enqueue(new NodeToControllerQueueItem(
+ time.milliseconds(),
+ request,
+ callback
+ ));
+ }
+
+ @Override
+ public long getTimeoutMs() {
+ return retryTimeoutMs;
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
b/server/src/main/java/org/apache/kafka/server/NodeToControllerQueueItem.java
similarity index 54%
copy from
server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
copy to
server/src/main/java/org/apache/kafka/server/NodeToControllerQueueItem.java
index 159913bf1ca..25b657fbe16 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/NodeToControllerChannelManager.java
+++
b/server/src/main/java/org/apache/kafka/server/NodeToControllerQueueItem.java
@@ -15,25 +15,20 @@
* limitations under the License.
*/
-package org.apache.kafka.server.common;
+package org.apache.kafka.server;
-import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
-import java.util.Optional;
-
-public interface NodeToControllerChannelManager {
-
- void start();
-
- void shutdown();
-
- Optional<NodeApiVersions> controllerApiVersions();
-
- void sendRequest(
- AbstractRequest.Builder<? extends AbstractRequest> request,
- ControllerRequestCompletionHandler callback
- );
-
- long getTimeoutMs();
+/**
+ * Represents a queued request to be sent to the controller.
+ * Used for timeout tracking and asynchronous completion handling.
+ *
+ * @param createdTimeMs timestamp when this request was created, used for
timeout detection
+ * @param request the request to send to the controller
+ * @param callback handler invoked when the request completes, fails, or times
out
+ */
+public record NodeToControllerQueueItem(Long createdTimeMs,
+ AbstractRequest.Builder<? extends
AbstractRequest> request,
+ ControllerRequestCompletionHandler
callback) {
}
diff --git
a/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
b/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
new file mode 100644
index 00000000000..e3d870999b2
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Background thread that manages to send requests to the active controller.
+ * <p>
+ * Maintains a queue of pending requests and handles automatic retries on
failures,
+ * controller failover, and timeout detection. Requests are re-queued when the
controller
+ * changes or connections are lost.
+ */
+public class NodeToControllerRequestThread extends InterBrokerSendThread {
+ private static final Logger log =
LoggerFactory.getLogger(NodeToControllerRequestThread.class);
+
+ private final LinkedBlockingDeque<NodeToControllerQueueItem> requestQueue
= new LinkedBlockingDeque<>();
+ private final AtomicReference<Node> activeController = new
AtomicReference<>(null);
+
+
+ private final Time time;
+ private final long retryTimeoutMs;
+ private final Supplier<ControllerInformation> controllerNodeProvider;
+ private final ManualMetadataUpdater metadataUpdater;
+
+ // Used for testing
+ volatile boolean started = false;
+ public void setStarted(boolean started) {
+ this.started = started;
+ }
+
+ public NodeToControllerRequestThread(KafkaClient initialNetworkClient,
+ ManualMetadataUpdater metadataUpdater,
+ Supplier<ControllerInformation>
controllerNodeProvider,
+ AbstractConfig config,
+ Time time,
+ String threadName,
+ Long retryTimeoutMs) {
+ super(threadName, initialNetworkClient, Math.min(Integer.MAX_VALUE,
(int)
Math.min(config.getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG),
retryTimeoutMs)), time, false);
+ this.time = time;
+ this.controllerNodeProvider = controllerNodeProvider;
+ this.metadataUpdater = metadataUpdater;
+ this.retryTimeoutMs = retryTimeoutMs;
+ }
+
+ public Optional<Node> activeControllerAddress() {
+ return Optional.ofNullable(activeController.get());
+ }
+
+ private void updateControllerAddress(Node newActiveController) {
+ activeController.set(newActiveController);
+ }
+
+ public void enqueue(NodeToControllerQueueItem request) {
+ if (!started) {
+ throw new IllegalStateException("Cannot enqueue a request if the
request thread is not running");
+ }
+ requestQueue.add(request);
+ if (activeControllerAddress().isPresent()) {
+ wakeup();
+ }
+ }
+
+ public int queueSize() {
+ return requestQueue.size();
+ }
+
+ @Override
+ public Collection<RequestAndCompletionHandler> generateRequests() {
+ final long currentTimeMs = time.milliseconds();
+ final Iterator<NodeToControllerQueueItem> requestIter =
requestQueue.iterator();
+ while (requestIter.hasNext()) {
+ var request = requestIter.next();
+ if (currentTimeMs - request.createdTimeMs() >= retryTimeoutMs) {
+ requestIter.remove();
+ request.callback().onTimeout();
+ } else {
+ Optional<Node> controllerAddress = activeControllerAddress();
+ if (controllerAddress.isPresent()) {
+ requestIter.remove();
+ return List.of(new RequestAndCompletionHandler(
+ time.milliseconds(),
+ controllerAddress.get(),
+ request.request(),
+ response -> handleResponse(request, response)
+ ));
+ }
+ }
+ }
+
+ return List.of();
+ }
+
+ void handleResponse(NodeToControllerQueueItem queueItem, ClientResponse
response) {
+ log.debug("Request {} received {}", queueItem.request(), response);
+ if (response.authenticationException() != null) {
+ log.error("Request {} failed due to authentication error with
controller. Disconnecting the " +
+ "connection to the stale controller {}",
+ queueItem.request(),
activeControllerAddress().map(Node::idString).orElse("null"),
+ response.authenticationException()
+ );
+ maybeDisconnectAndUpdateController();
+ queueItem.callback().onComplete(response);
+ } else if (response.versionMismatch() != null) {
+ log.error("Request {} failed due to unsupported version error",
queueItem.request(),
+ response.versionMismatch());
+ queueItem.callback().onComplete(response);
+ } else if (response.wasDisconnected()) {
+ updateControllerAddress(null);
+ requestQueue.addFirst(queueItem);
+ } else if
(response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+ log.debug("Request {} received NOT_CONTROLLER exception.
Disconnecting the " +
+ "connection to the stale controller {}",
+ queueItem.request(),
+
activeControllerAddress().map(Node::idString).orElse("null"));
+ maybeDisconnectAndUpdateController();
+ requestQueue.addFirst(queueItem);
+ } else {
+ queueItem.callback().onComplete(response);
+ }
+ }
+
+ private void maybeDisconnectAndUpdateController() {
+ // just close the controller connection and wait for metadata cache
update in doWork
+ activeControllerAddress().ifPresent(controllerAddress -> {
+ try {
+ // We don't care if disconnect has an error, just log it and
get a new network client
+ networkClient.disconnect(controllerAddress.idString());
+ } catch (Throwable t) {
+ log.error("Had an error while disconnecting from
NetworkClient.", t);
+ }
+ updateControllerAddress(null);
+ });
+ }
+
+ @Override
+ public void doWork() {
+ if (activeControllerAddress().isPresent()) {
+ super.pollOnce(Long.MAX_VALUE);
+ } else {
+ log.debug("Controller isn't cached, looking for local metadata
changes");
+ final ControllerInformation controllerInformation =
controllerNodeProvider.get();
+ Optional<Node> nodeOptional = controllerInformation.node();
+ if (nodeOptional.isPresent()) {
+ Node controllerNode = nodeOptional.get();
+ log.info("Recorded new KRaft controller, from now on will use
node {}", controllerNode);
+ updateControllerAddress(controllerNode);
+ metadataUpdater.setNodes(List.of(controllerNode));
+ } else {
+ // need to backoff to avoid tight loops
+ log.debug("No controller provided, retrying after backoff");
+ super.pollOnce(100);
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ started = true;
+ }
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/RaftControllerNodeProvider.java
b/server/src/main/java/org/apache/kafka/server/RaftControllerNodeProvider.java
new file mode 100644
index 00000000000..6d4f2313501
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/RaftControllerNodeProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.raft.KRaftConfigs;
+import org.apache.kafka.raft.RaftManager;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.function.Supplier;
+
+/**
+ * Finds the controller node by checking the metadata log manager.
+ * This provider is used when we are using a Raft-based metadata quorum.
+ */
+public class RaftControllerNodeProvider implements
Supplier<ControllerInformation> {
+
+ private final RaftManager<ApiMessageAndVersion> raftManager;
+ private final ListenerName listenerName;
+ private final SecurityProtocol securityProtocol;
+ private final String saslMechanism;
+
+ public static RaftControllerNodeProvider
create(RaftManager<ApiMessageAndVersion> raftManager, AbstractKafkaConfig
config) {
+ final ListenerName controllerListenerName = new
ListenerName(config.controllerListenerNames().get(0));
+ final SecurityProtocol controllerSecurityProtocol =
Optional.ofNullable(config.effectiveListenerSecurityProtocolMap().get(controllerListenerName))
+ .orElseGet(() ->
SecurityProtocol.forName(controllerListenerName.value()));
+ final String controllerSaslMechanism =
config.getString(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG);
+ return new RaftControllerNodeProvider(raftManager,
controllerListenerName, controllerSecurityProtocol, controllerSaslMechanism);
+ }
+
+ public RaftControllerNodeProvider(RaftManager<ApiMessageAndVersion>
raftManager, ListenerName listenerName, SecurityProtocol securityProtocol,
String saslMechanism) {
+ this.raftManager = raftManager;
+ this.listenerName = listenerName;
+ this.securityProtocol = securityProtocol;
+ this.saslMechanism = saslMechanism;
+ }
+
+ @SuppressWarnings("resource")
+ @Override
+ public ControllerInformation get() {
+ OptionalInt leaderIdOpt =
raftManager.client().leaderAndEpoch().leaderId();
+
+ Optional<Node> node = leaderIdOpt.isPresent()
+ ? raftManager.client().voterNode(leaderIdOpt.getAsInt(),
listenerName)
+ : Optional.empty();
+
+ return new ControllerInformation(node, listenerName, securityProtocol,
saslMechanism);
+ }
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 7b77e2962cf..f0d25f3ca42 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -17,6 +17,7 @@
package org.apache.kafka.server.config;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
@@ -207,4 +208,24 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
}
return connectionString.substring(0,
firstColon).toUpperCase(Locale.ROOT);
}
+
+ /**
+ * Registers a component for dynamic reconfiguration notifications.
+ * <p>
+ * This method exists to support migration from kafka.server.KafkaConfig
(Scala/core) to AbstractKafkaConfig (Java/server).
+ * When migrating code, replace KafkaConfig references with
AbstractKafkaConfig.
+ *
+ * @param reconfigurable the component to register for configuration
updates
+ */
+ public abstract void addReconfigurable(Reconfigurable reconfigurable);
+
+ /**
+ * Unregisters a component from dynamic reconfiguration notifications.
+ * <p>
+ * This method exists to support migration from kafka.server.KafkaConfig
(Scala/core) to AbstractKafkaConfig (Java/server).
+ * When migrating code, replace KafkaConfig references with
AbstractKafkaConfig.
+ *
+ * @param reconfigurable the component to unregister
+ */
+ public abstract void removeReconfigurable(Reconfigurable reconfigurable);
}
diff --git
a/server/src/test/java/org/apache/kafka/server/BrokerRegistrationRequestTest.java
b/server/src/test/java/org/apache/kafka/server/BrokerRegistrationRequestTest.java
index 827bb60fbe8..3bb6a2199b4 100644
---
a/server/src/test/java/org/apache/kafka/server/BrokerRegistrationRequestTest.java
+++
b/server/src/test/java/org/apache/kafka/server/BrokerRegistrationRequestTest.java
@@ -16,10 +16,8 @@
*/
package org.apache.kafka.server;
-import kafka.server.ControllerInformation;
-import kafka.server.ControllerNodeProvider;
+
import kafka.server.ControllerServer;
-import kafka.server.NodeToControllerChannelManagerImpl;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Node;
@@ -48,8 +46,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
-import scala.Option;
+import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -125,7 +122,7 @@ class BrokerRegistrationRequestTest {
}
@Override
- public void close() {
+ public void close() throws InterruptedException {
channelManager.shutdown();
metrics.close();
}
@@ -213,7 +210,7 @@ class BrokerRegistrationRequestTest {
}
record TestControllerNodeProvider(ClusterInstance clusterInstance)
- implements ControllerNodeProvider {
+ implements Supplier<ControllerInformation> {
public Optional<Node> node() {
return Optional.of(new Node(
@@ -236,9 +233,9 @@ class BrokerRegistrationRequestTest {
}
@Override
- public ControllerInformation getControllerInfo() {
- return ControllerInformation.apply(
- Option.apply(node().orElse(null)),
+ public ControllerInformation get() {
+ return new ControllerInformation(
+ node(),
listenerName(),
securityProtocol(),
saslMechanism()
diff --git
a/server/src/test/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManagerTest.java
b/server/src/test/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManagerTest.java
index 86abcdcb6c2..91b2dc586af 100644
---
a/server/src/test/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManagerTest.java
+++
b/server/src/test/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManagerTest.java
@@ -90,7 +90,17 @@ public class AddPartitionsToTxnManagerTest {
KRaftConfigs.NODE_ID_CONFIG, "1",
KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER"),
Map.of(),
- false) { };
+ false) {
+ @Override
+ public void addReconfigurable(org.apache.kafka.common.Reconfigurable
reconfigurable) {
+ // No-op for test
+ }
+
+ @Override
+ public void
removeReconfigurable(org.apache.kafka.common.Reconfigurable reconfigurable) {
+ // No-op for test
+ }
+ };
private final AddPartitionsToTxnManager addPartitionsToTxnManager =
new AddPartitionsToTxnManager(config, networkClient,
metadataCache, partitionFor, time);