This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1ee715473cb KAFKA-18446 Remove MetadataCacheControllerNodeProvider 
(#18437)
1ee715473cb is described below

commit 1ee715473cbbed2d46bea2ef720630e6d7471d08
Author: Xuan-Zhang Gong <gongxuanzhangm...@gmail.com>
AuthorDate: Sun Jan 12 02:12:35 2025 +0800

    KAFKA-18446 Remove MetadataCacheControllerNodeProvider (#18437)
    
    Reviewers: PoAn Yang <pay...@apache.org>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 .../server/NodeToControllerChannelManager.scala    | 71 ++--------------------
 .../server/NodeToControllerRequestThreadTest.scala | 20 +++---
 .../server/BrokerRegistrationRequestTest.scala     |  4 +-
 .../unit/kafka/server/ForwardingManagerTest.scala  |  4 +-
 .../kafka/server/RegistrationTestContext.scala     |  2 +-
 .../kafka/server/util/InterBrokerSendThread.java   |  6 +-
 6 files changed, 23 insertions(+), 84 deletions(-)

diff --git 
a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala 
b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
index 256bc46f211..c353a825503 100644
--- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
@@ -17,13 +17,9 @@
 
 package kafka.server
 
-import java.util.concurrent.LinkedBlockingDeque
-import java.util.concurrent.atomic.AtomicReference
 import kafka.raft.RaftManager
-import kafka.server.metadata.ZkMetadataCache
 import kafka.utils.Logging
 import org.apache.kafka.clients._
-import org.apache.kafka.common.{Node, Reconfigurable}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.Errors
@@ -31,11 +27,14 @@ import org.apache.kafka.common.requests.AbstractRequest
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.{Node, Reconfigurable}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, 
ControllerRequestCompletionHandler, NodeToControllerChannelManager}
 import org.apache.kafka.server.util.{InterBrokerSendThread, 
RequestAndCompletionHandler}
 
 import java.util
 import java.util.Optional
+import java.util.concurrent.LinkedBlockingDeque
+import java.util.concurrent.atomic.AtomicReference
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters.{RichOption, RichOptionalInt}
@@ -44,45 +43,13 @@ case class ControllerInformation(
   node: Option[Node],
   listenerName: ListenerName,
   securityProtocol: SecurityProtocol,
-  saslMechanism: String,
-  isZkController: Boolean
+  saslMechanism: String
 )
 
 trait ControllerNodeProvider {
   def getControllerInfo(): ControllerInformation
 }
 
-class MetadataCacheControllerNodeProvider(
-  val metadataCache: ZkMetadataCache,
-  val config: KafkaConfig,
-  val quorumControllerNodeProvider: () => Option[ControllerInformation]
-) extends ControllerNodeProvider {
-
-  private val zkControllerListenerName = config.interBrokerListenerName
-  private val zkControllerSecurityProtocol = config.interBrokerSecurityProtocol
-  private val zkControllerSaslMechanism = 
config.saslMechanismInterBrokerProtocol
-
-  val emptyZkControllerInfo =  ControllerInformation(
-    None,
-    zkControllerListenerName,
-    zkControllerSecurityProtocol,
-    zkControllerSaslMechanism,
-    isZkController = true)
-
-  override def getControllerInfo(): ControllerInformation = {
-    metadataCache.getControllerId.map {
-      case ZkCachedControllerId(id) => ControllerInformation(
-        metadataCache.getAliveBrokerNode(id, zkControllerListenerName),
-        zkControllerListenerName,
-        zkControllerSecurityProtocol,
-        zkControllerSaslMechanism,
-        isZkController = true)
-      case KRaftCachedControllerId(_) =>
-        quorumControllerNodeProvider.apply().getOrElse(emptyZkControllerInfo)
-    }.getOrElse(emptyZkControllerInfo)
-  }
-}
-
 object RaftControllerNodeProvider {
   def apply(
     raftManager: RaftManager[ApiMessageAndVersion],
@@ -115,7 +82,7 @@ class RaftControllerNodeProvider(
 
   override def getControllerInfo(): ControllerInformation =
     
ControllerInformation(raftManager.leaderAndEpoch.leaderId.toScala.flatMap(idToNode),
-      listenerName, securityProtocol, saslMechanism, isZkController = false)
+      listenerName, securityProtocol, saslMechanism)
 }
 
 /**
@@ -198,8 +165,6 @@ class NodeToControllerChannelManagerImpl(
     val controllerInformation = controllerNodeProvider.getControllerInfo()
     new NodeToControllerRequestThread(
       buildNetworkClient(controllerInformation),
-      controllerInformation.isZkController,
-      buildNetworkClient,
       manualMetadataUpdater,
       controllerNodeProvider,
       config,
@@ -243,8 +208,6 @@ case class NodeToControllerQueueItem(
 
 class NodeToControllerRequestThread(
   initialNetworkClient: KafkaClient,
-  var isNetworkClientForZkController: Boolean,
-  networkClientFactory: ControllerInformation => KafkaClient,
   metadataUpdater: ManualMetadataUpdater,
   controllerNodeProvider: ControllerNodeProvider,
   config: KafkaConfig,
@@ -261,22 +224,6 @@ class NodeToControllerRequestThread(
 
   this.logIdent = logPrefix
 
-  private def maybeResetNetworkClient(controllerInformation: 
ControllerInformation): Unit = {
-    if (isNetworkClientForZkController != 
controllerInformation.isZkController) {
-      debug("Controller changed to " + (if (isNetworkClientForZkController) 
"kraft" else "zk") + " mode. " +
-        s"Resetting network client with new controller information : 
${controllerInformation}")
-      // Close existing network client.
-      val oldClient = networkClient
-      oldClient.initiateClose()
-      oldClient.close()
-
-      isNetworkClientForZkController = controllerInformation.isZkController
-      updateControllerAddress(controllerInformation.node.orNull)
-      controllerInformation.node.foreach(n => 
metadataUpdater.setNodes(Seq(n).asJava))
-      networkClient = networkClientFactory(controllerInformation)
-    }
-  }
-
   private val requestQueue = new 
LinkedBlockingDeque[NodeToControllerQueueItem]()
   private val activeController = new AtomicReference[Node](null)
 
@@ -370,19 +317,13 @@ class NodeToControllerRequestThread(
 
   override def doWork(): Unit = {
     val controllerInformation = controllerNodeProvider.getControllerInfo()
-    maybeResetNetworkClient(controllerInformation)
     if (activeControllerAddress().isDefined) {
       super.pollOnce(Long.MaxValue)
     } else {
       debug("Controller isn't cached, looking for local metadata changes")
       controllerInformation.node match {
         case Some(controllerNode) =>
-          val controllerType = if (controllerInformation.isZkController) {
-            "ZK"
-          } else {
-            "KRaft"
-          }
-          info(s"Recorded new $controllerType controller, from now on will use 
node $controllerNode")
+          info(s"Recorded new KRaft controller, from now on will use node 
$controllerNode")
           updateControllerAddress(controllerNode)
           metadataUpdater.setNodes(Seq(controllerNode).asJava)
         case None =>
diff --git 
a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala 
b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
index 8c820e41da4..cb08a021e2c 100644
--- a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
@@ -40,7 +40,7 @@ import org.mockito.Mockito._
 class NodeToControllerRequestThreadTest {
 
   private def controllerInfo(node: Option[Node]): ControllerInformation = {
-    ControllerInformation(node, new ListenerName(""), 
SecurityProtocol.PLAINTEXT, "", isZkController = true)
+    ControllerInformation(node, new ListenerName(""), 
SecurityProtocol.PLAINTEXT, "")
   }
 
   private def emptyControllerInfo: ControllerInformation = {
@@ -59,7 +59,7 @@ class NodeToControllerRequestThreadTest {
 
     val retryTimeoutMs = 30000
     val testRequestThread = new NodeToControllerRequestThread(
-      mockClient, isNetworkClientForZkController = true, _ => mockClient, new 
ManualMetadataUpdater(),
+      mockClient,  new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs)
     testRequestThread.started = true
 
@@ -97,7 +97,7 @@ class NodeToControllerRequestThreadTest {
 
     val expectedResponse = RequestTestUtils.metadataUpdateWith(2, 
Collections.singletonMap("a", 2))
     val testRequestThread = new NodeToControllerRequestThread(
-      mockClient, isNetworkClientForZkController = true, _ => mockClient, new 
ManualMetadataUpdater(),
+      mockClient,  new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
     mockClient.prepareResponse(expectedResponse)
@@ -141,7 +141,7 @@ class NodeToControllerRequestThreadTest {
 
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, 
Collections.singletonMap("a", 2))
     val testRequestThread = new NodeToControllerRequestThread(
-      mockClient, isNetworkClientForZkController = true, _ => mockClient, new 
ManualMetadataUpdater(),
+      mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
@@ -193,7 +193,7 @@ class NodeToControllerRequestThreadTest {
       Collections.singletonMap("a", 2))
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, 
Collections.singletonMap("a", 2))
     val testRequestThread = new NodeToControllerRequestThread(
-      mockClient, isNetworkClientForZkController = true,_ => mockClient, new 
ManualMetadataUpdater(),
+      mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
@@ -260,7 +260,7 @@ class NodeToControllerRequestThreadTest {
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, 
Collections.singletonMap("a", 2))
 
     val testRequestThread = new NodeToControllerRequestThread(
-      mockClient, isNetworkClientForZkController = true, _ => mockClient, new 
ManualMetadataUpdater(),
+      mockClient,  new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
@@ -323,7 +323,7 @@ class NodeToControllerRequestThreadTest {
       Collections.singletonMap("a", Errors.NOT_CONTROLLER),
       Collections.singletonMap("a", 2))
     val testRequestThread = new NodeToControllerRequestThread(
-      mockClient, isNetworkClientForZkController = true,_ => mockClient, new 
ManualMetadataUpdater(),
+      mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs)
     testRequestThread.started = true
 
@@ -382,7 +382,7 @@ class NodeToControllerRequestThreadTest {
     mockClient.prepareUnsupportedVersionResponse(request => request.apiKey == 
ApiKeys.METADATA)
 
     val testRequestThread = new NodeToControllerRequestThread(
-      mockClient, isNetworkClientForZkController = true, _ => mockClient, new 
ManualMetadataUpdater(),
+      mockClient,   new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
@@ -420,7 +420,7 @@ class NodeToControllerRequestThreadTest {
     mockClient.createPendingAuthenticationError(activeController, 50)
 
     val testRequestThread = new NodeToControllerRequestThread(
-      mockClient, isNetworkClientForZkController = true, _ => mockClient, new 
ManualMetadataUpdater(),
+      mockClient,  new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
@@ -443,7 +443,7 @@ class NodeToControllerRequestThreadTest {
     
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
 
     val testRequestThread = new NodeToControllerRequestThread(
-      mockClient, isNetworkClientForZkController = true, _ => mockClient, new 
ManualMetadataUpdater(),
+      mockClient,   new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
 
     val completionHandler = new TestControllerRequestCompletionHandler(None)
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index 5456ab1f69d..7d40f34fd9d 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -57,10 +57,8 @@ class BrokerRegistrationRequestTest {
 
         val saslMechanism: String = ""
 
-        def isZkController: Boolean = false
-
         override def getControllerInfo(): ControllerInformation =
-          ControllerInformation(node, listenerName, securityProtocol, 
saslMechanism, isZkController)
+          ControllerInformation(node, listenerName, securityProtocol, 
saslMechanism)
       },
       Time.SYSTEM,
       new Metrics(),
diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
index 26e91d4d3db..d2d8d3e0382 100644
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
@@ -66,11 +66,11 @@ class ForwardingManagerTest {
   }
 
   private def controllerInfo = {
-    ControllerInformation(Some(new Node(0, "host", 1234)), new 
ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true)
+    ControllerInformation(Some(new Node(0, "host", 1234)), new 
ListenerName(""), SecurityProtocol.PLAINTEXT, "")
   }
 
   private def emptyControllerInfo = {
-    ControllerInformation(None, new ListenerName(""), 
SecurityProtocol.PLAINTEXT, "", isZkController = true)
+    ControllerInformation(None, new ListenerName(""), 
SecurityProtocol.PLAINTEXT, "")
   }
 
   @Test
diff --git 
a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala 
b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
index 8f00f0aa1c1..9bf4d4d7e00 100644
--- a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
+++ b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
@@ -43,7 +43,7 @@ class SimpleControllerNodeProvider extends 
ControllerNodeProvider {
   def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM
 
   override def getControllerInfo(): ControllerInformation = 
ControllerInformation(Option(node.get()),
-    listenerName, securityProtocol, saslMechanism, isZkController = false)
+    listenerName, securityProtocol, saslMechanism)
 }
 
 class RegistrationTestContext(
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
 
b/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
index 1ef60a547bc..093946eb5f0 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
@@ -43,13 +43,13 @@ import java.util.Set;
  */
 public abstract class InterBrokerSendThread extends ShutdownableThread {
 
-    protected volatile KafkaClient networkClient;
+    protected final KafkaClient networkClient;
 
     private final int requestTimeoutMs;
     private final Time time;
     private final UnsentRequests unsentRequests;
 
-    public InterBrokerSendThread(
+    protected InterBrokerSendThread(
         String name,
         KafkaClient networkClient,
         int requestTimeoutMs,
@@ -58,7 +58,7 @@ public abstract class InterBrokerSendThread extends 
ShutdownableThread {
         this(name, networkClient, requestTimeoutMs, time, true);
     }
 
-    public InterBrokerSendThread(
+    protected InterBrokerSendThread(
         String name,
         KafkaClient networkClient,
         int requestTimeoutMs,

Reply via email to