This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 1ee715473cb KAFKA-18446 Remove MetadataCacheControllerNodeProvider (#18437) 1ee715473cb is described below commit 1ee715473cbbed2d46bea2ef720630e6d7471d08 Author: Xuan-Zhang Gong <gongxuanzhangm...@gmail.com> AuthorDate: Sun Jan 12 02:12:35 2025 +0800 KAFKA-18446 Remove MetadataCacheControllerNodeProvider (#18437) Reviewers: PoAn Yang <pay...@apache.org>, Chia-Ping Tsai <chia7...@gmail.com> --- .../server/NodeToControllerChannelManager.scala | 71 ++-------------------- .../server/NodeToControllerRequestThreadTest.scala | 20 +++--- .../server/BrokerRegistrationRequestTest.scala | 4 +- .../unit/kafka/server/ForwardingManagerTest.scala | 4 +- .../kafka/server/RegistrationTestContext.scala | 2 +- .../kafka/server/util/InterBrokerSendThread.java | 6 +- 6 files changed, 23 insertions(+), 84 deletions(-) diff --git a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala index 256bc46f211..c353a825503 100644 --- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala @@ -17,13 +17,9 @@ package kafka.server -import java.util.concurrent.LinkedBlockingDeque -import java.util.concurrent.atomic.AtomicReference import kafka.raft.RaftManager -import kafka.server.metadata.ZkMetadataCache import kafka.utils.Logging import org.apache.kafka.clients._ -import org.apache.kafka.common.{Node, Reconfigurable} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network._ import org.apache.kafka.common.protocol.Errors @@ -31,11 +27,14 @@ import org.apache.kafka.common.requests.AbstractRequest import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.common.{Node, Reconfigurable} import org.apache.kafka.server.common.{ApiMessageAndVersion, ControllerRequestCompletionHandler, NodeToControllerChannelManager} import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler} import java.util import java.util.Optional +import java.util.concurrent.LinkedBlockingDeque +import java.util.concurrent.atomic.AtomicReference import scala.collection.Seq import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters.{RichOption, RichOptionalInt} @@ -44,45 +43,13 @@ case class ControllerInformation( node: Option[Node], listenerName: ListenerName, securityProtocol: SecurityProtocol, - saslMechanism: String, - isZkController: Boolean + saslMechanism: String ) trait ControllerNodeProvider { def getControllerInfo(): ControllerInformation } -class MetadataCacheControllerNodeProvider( - val metadataCache: ZkMetadataCache, - val config: KafkaConfig, - val quorumControllerNodeProvider: () => Option[ControllerInformation] -) extends ControllerNodeProvider { - - private val zkControllerListenerName = config.interBrokerListenerName - private val zkControllerSecurityProtocol = config.interBrokerSecurityProtocol - private val zkControllerSaslMechanism = config.saslMechanismInterBrokerProtocol - - val emptyZkControllerInfo = ControllerInformation( - None, - zkControllerListenerName, - zkControllerSecurityProtocol, - zkControllerSaslMechanism, - isZkController = true) - - override def getControllerInfo(): ControllerInformation = { - metadataCache.getControllerId.map { - case ZkCachedControllerId(id) => ControllerInformation( - metadataCache.getAliveBrokerNode(id, zkControllerListenerName), - zkControllerListenerName, - zkControllerSecurityProtocol, - zkControllerSaslMechanism, - isZkController = true) - case KRaftCachedControllerId(_) => - quorumControllerNodeProvider.apply().getOrElse(emptyZkControllerInfo) - }.getOrElse(emptyZkControllerInfo) - } -} - object RaftControllerNodeProvider { def apply( raftManager: RaftManager[ApiMessageAndVersion], @@ -115,7 +82,7 @@ class RaftControllerNodeProvider( override def getControllerInfo(): ControllerInformation = ControllerInformation(raftManager.leaderAndEpoch.leaderId.toScala.flatMap(idToNode), - listenerName, securityProtocol, saslMechanism, isZkController = false) + listenerName, securityProtocol, saslMechanism) } /** @@ -198,8 +165,6 @@ class NodeToControllerChannelManagerImpl( val controllerInformation = controllerNodeProvider.getControllerInfo() new NodeToControllerRequestThread( buildNetworkClient(controllerInformation), - controllerInformation.isZkController, - buildNetworkClient, manualMetadataUpdater, controllerNodeProvider, config, @@ -243,8 +208,6 @@ case class NodeToControllerQueueItem( class NodeToControllerRequestThread( initialNetworkClient: KafkaClient, - var isNetworkClientForZkController: Boolean, - networkClientFactory: ControllerInformation => KafkaClient, metadataUpdater: ManualMetadataUpdater, controllerNodeProvider: ControllerNodeProvider, config: KafkaConfig, @@ -261,22 +224,6 @@ class NodeToControllerRequestThread( this.logIdent = logPrefix - private def maybeResetNetworkClient(controllerInformation: ControllerInformation): Unit = { - if (isNetworkClientForZkController != controllerInformation.isZkController) { - debug("Controller changed to " + (if (isNetworkClientForZkController) "kraft" else "zk") + " mode. " + - s"Resetting network client with new controller information : ${controllerInformation}") - // Close existing network client. - val oldClient = networkClient - oldClient.initiateClose() - oldClient.close() - - isNetworkClientForZkController = controllerInformation.isZkController - updateControllerAddress(controllerInformation.node.orNull) - controllerInformation.node.foreach(n => metadataUpdater.setNodes(Seq(n).asJava)) - networkClient = networkClientFactory(controllerInformation) - } - } - private val requestQueue = new LinkedBlockingDeque[NodeToControllerQueueItem]() private val activeController = new AtomicReference[Node](null) @@ -370,19 +317,13 @@ class NodeToControllerRequestThread( override def doWork(): Unit = { val controllerInformation = controllerNodeProvider.getControllerInfo() - maybeResetNetworkClient(controllerInformation) if (activeControllerAddress().isDefined) { super.pollOnce(Long.MaxValue) } else { debug("Controller isn't cached, looking for local metadata changes") controllerInformation.node match { case Some(controllerNode) => - val controllerType = if (controllerInformation.isZkController) { - "ZK" - } else { - "KRaft" - } - info(s"Recorded new $controllerType controller, from now on will use node $controllerNode") + info(s"Recorded new KRaft controller, from now on will use node $controllerNode") updateControllerAddress(controllerNode) metadataUpdater.setNodes(Seq(controllerNode).asJava) case None => diff --git a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala index 8c820e41da4..cb08a021e2c 100644 --- a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala +++ b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala @@ -40,7 +40,7 @@ import org.mockito.Mockito._ class NodeToControllerRequestThreadTest { private def controllerInfo(node: Option[Node]): ControllerInformation = { - ControllerInformation(node, new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true) + ControllerInformation(node, new ListenerName(""), SecurityProtocol.PLAINTEXT, "") } private def emptyControllerInfo: ControllerInformation = { @@ -59,7 +59,7 @@ class NodeToControllerRequestThreadTest { val retryTimeoutMs = 30000 val testRequestThread = new NodeToControllerRequestThread( - mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(), + mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs) testRequestThread.started = true @@ -97,7 +97,7 @@ class NodeToControllerRequestThreadTest { val expectedResponse = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2)) val testRequestThread = new NodeToControllerRequestThread( - mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(), + mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) testRequestThread.started = true mockClient.prepareResponse(expectedResponse) @@ -141,7 +141,7 @@ class NodeToControllerRequestThreadTest { val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2)) val testRequestThread = new NodeToControllerRequestThread( - mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(), + mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) testRequestThread.started = true @@ -193,7 +193,7 @@ class NodeToControllerRequestThreadTest { Collections.singletonMap("a", 2)) val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2)) val testRequestThread = new NodeToControllerRequestThread( - mockClient, isNetworkClientForZkController = true,_ => mockClient, new ManualMetadataUpdater(), + mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) testRequestThread.started = true @@ -260,7 +260,7 @@ class NodeToControllerRequestThreadTest { val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2)) val testRequestThread = new NodeToControllerRequestThread( - mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(), + mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) testRequestThread.started = true @@ -323,7 +323,7 @@ class NodeToControllerRequestThreadTest { Collections.singletonMap("a", Errors.NOT_CONTROLLER), Collections.singletonMap("a", 2)) val testRequestThread = new NodeToControllerRequestThread( - mockClient, isNetworkClientForZkController = true,_ => mockClient, new ManualMetadataUpdater(), + mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs) testRequestThread.started = true @@ -382,7 +382,7 @@ class NodeToControllerRequestThreadTest { mockClient.prepareUnsupportedVersionResponse(request => request.apiKey == ApiKeys.METADATA) val testRequestThread = new NodeToControllerRequestThread( - mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(), + mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) testRequestThread.started = true @@ -420,7 +420,7 @@ class NodeToControllerRequestThreadTest { mockClient.createPendingAuthenticationError(activeController, 50) val testRequestThread = new NodeToControllerRequestThread( - mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(), + mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) testRequestThread.started = true @@ -443,7 +443,7 @@ class NodeToControllerRequestThreadTest { when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo) val testRequestThread = new NodeToControllerRequestThread( - mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(), + mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) val completionHandler = new TestControllerRequestCompletionHandler(None) diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala index 5456ab1f69d..7d40f34fd9d 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala @@ -57,10 +57,8 @@ class BrokerRegistrationRequestTest { val saslMechanism: String = "" - def isZkController: Boolean = false - override def getControllerInfo(): ControllerInformation = - ControllerInformation(node, listenerName, securityProtocol, saslMechanism, isZkController) + ControllerInformation(node, listenerName, securityProtocol, saslMechanism) }, Time.SYSTEM, new Metrics(), diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala index 26e91d4d3db..d2d8d3e0382 100644 --- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala @@ -66,11 +66,11 @@ class ForwardingManagerTest { } private def controllerInfo = { - ControllerInformation(Some(new Node(0, "host", 1234)), new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true) + ControllerInformation(Some(new Node(0, "host", 1234)), new ListenerName(""), SecurityProtocol.PLAINTEXT, "") } private def emptyControllerInfo = { - ControllerInformation(None, new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true) + ControllerInformation(None, new ListenerName(""), SecurityProtocol.PLAINTEXT, "") } @Test diff --git a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala index 8f00f0aa1c1..9bf4d4d7e00 100644 --- a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala +++ b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala @@ -43,7 +43,7 @@ class SimpleControllerNodeProvider extends ControllerNodeProvider { def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM override def getControllerInfo(): ControllerInformation = ControllerInformation(Option(node.get()), - listenerName, securityProtocol, saslMechanism, isZkController = false) + listenerName, securityProtocol, saslMechanism) } class RegistrationTestContext( diff --git a/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java b/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java index 1ef60a547bc..093946eb5f0 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java @@ -43,13 +43,13 @@ import java.util.Set; */ public abstract class InterBrokerSendThread extends ShutdownableThread { - protected volatile KafkaClient networkClient; + protected final KafkaClient networkClient; private final int requestTimeoutMs; private final Time time; private final UnsentRequests unsentRequests; - public InterBrokerSendThread( + protected InterBrokerSendThread( String name, KafkaClient networkClient, int requestTimeoutMs, @@ -58,7 +58,7 @@ public abstract class InterBrokerSendThread extends ShutdownableThread { this(name, networkClient, requestTimeoutMs, time, true); } - public InterBrokerSendThread( + protected InterBrokerSendThread( String name, KafkaClient networkClient, int requestTimeoutMs,