mumrah commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1042322800


##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -37,42 +38,53 @@ import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
+case class ControllerInformation(node: Option[Node], listenerName: 
ListenerName,

Review Comment:
   style nit: one param per line



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -164,25 +182,54 @@ class BrokerToControllerChannelManagerImpl(
   private val logContext = new LogContext(s"[BrokerToControllerChannelManager 
broker=${config.brokerId} name=$channelName] ")
   private val manualMetadataUpdater = new ManualMetadataUpdater()
   private val apiVersions = new ApiVersions()
-  private val requestThread = newRequestThread
+  private var isZkControllerThread = true
+  @volatile private var requestThread = newRequestThread
 
   def start(): Unit = {
     requestThread.start()
+    // If migration is enabled for zkBroker, then we might see controller 
change from zk to kraft
+    // and vice-versa. This periodic task takes care of setting the right 
channel when such
+    // controller change is noticed.
+    if (config.migrationEnabled && config.requiresZookeeper) {
+      kafkaScheduler.schedule("maybeReinitializeRequestThread",
+        () => maybeReinitializeRequestThread(), 0L, 500, TimeUnit.MILLISECONDS)
+    }
   }
 
   def shutdown(): Unit = {
     requestThread.shutdown()
     info(s"Broker to controller channel manager for $channelName shutdown")
   }
 
+  def maybeReinitializeRequestThread(): Unit = {
+    val controllerInfo = controllerNodeProvider.getControllerInfo()
+    if (isZkControllerThread == controllerInfo.isZkController)
+      return
+    val oldRequestThread = requestThread
+    // Close the old thread and start a new one if old one is not already in 
the process of
+    // shutting down.
+    if (oldRequestThread.isRunning) {

Review Comment:
   If the old thread is not running, what would that mean here? 



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -544,6 +549,7 @@ class KafkaServer(
   private def controlledShutdown(): Unit = {
     val socketTimeoutMs = config.controllerSocketTimeoutMs
 
+    // TODO: Handle controlled shutdown for zkBroker when we have KRaft 
controller.

Review Comment:
   I just filed https://issues.apache.org/jira/browse/KAFKA-14447, can you add 
a comment here



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -1665,6 +1665,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty
 
   val migrationEnabled: Boolean = getBoolean(KafkaConfig.MigrationEnabledProp)
+  def enableZkApiForwarding: Boolean = migrationEnabled && 
interBrokerProtocolVersion.isApiForwardingSupported

Review Comment:
   Good question @dengziming. We said in the KIP that forwarding is enabled in 
all cases with 3.4-IV0, though technically we only need to enable it if we're 
doing a migration. It might be safer to keep forwarding off when we haven't 
enabled the migration. Since we won't start the migration until all brokers 
come up with `zookeeper.metadata.migration.enable`, I think leaving it off like 
this will be okay.



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -164,25 +182,54 @@ class BrokerToControllerChannelManagerImpl(
   private val logContext = new LogContext(s"[BrokerToControllerChannelManager 
broker=${config.brokerId} name=$channelName] ")
   private val manualMetadataUpdater = new ManualMetadataUpdater()
   private val apiVersions = new ApiVersions()
-  private val requestThread = newRequestThread
+  private var isZkControllerThread = true
+  @volatile private var requestThread = newRequestThread
 
   def start(): Unit = {
     requestThread.start()
+    // If migration is enabled for zkBroker, then we might see controller 
change from zk to kraft
+    // and vice-versa. This periodic task takes care of setting the right 
channel when such
+    // controller change is noticed.
+    if (config.migrationEnabled && config.requiresZookeeper) {
+      kafkaScheduler.schedule("maybeReinitializeRequestThread",
+        () => maybeReinitializeRequestThread(), 0L, 500, TimeUnit.MILLISECONDS)
+    }
   }
 
   def shutdown(): Unit = {
     requestThread.shutdown()
     info(s"Broker to controller channel manager for $channelName shutdown")
   }
 
+  def maybeReinitializeRequestThread(): Unit = {
+    val controllerInfo = controllerNodeProvider.getControllerInfo()
+    if (isZkControllerThread == controllerInfo.isZkController)
+      return
+    val oldRequestThread = requestThread
+    // Close the old thread and start a new one if old one is not already in 
the process of
+    // shutting down.
+    if (oldRequestThread.isRunning) {
+      val newThread = newRequestThread
+      newThread.start()
+      requestThread = newThread
+      // Shutdown the request thread. This fails the inflight requests to be 
failed with
+      // disconnection exception.
+      oldRequestThread.shutdown()
+      // There can still be unsent requests in the queue which need to be 
cleaned.
+      oldRequestThread.failAllUnsentRequests()
+    }
+  }
+
   private[server] def newRequestThread = {

Review Comment:
   Can we pass the `ControllerInfo` in here instead of re-reading it?



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -164,25 +182,54 @@ class BrokerToControllerChannelManagerImpl(
   private val logContext = new LogContext(s"[BrokerToControllerChannelManager 
broker=${config.brokerId} name=$channelName] ")
   private val manualMetadataUpdater = new ManualMetadataUpdater()
   private val apiVersions = new ApiVersions()
-  private val requestThread = newRequestThread
+  private var isZkControllerThread = true

Review Comment:
   Should this be volatile as well? 



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -243,6 +243,10 @@ public boolean isNoOpRecordSupported() {
         return this.isAtLeast(IBP_3_3_IV1);
     }
 
+    public boolean isApiForwardingSupported() {

Review Comment:
   nit: Should we say "isApiForwardingEnabled" since we require API forwarding 
at this version?



##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -315,7 +324,35 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
     }.getOrElse(Map.empty[Int, Node])
   }
 
-  def getControllerId: Option[Int] = metadataSnapshot.controllerId
+  def getControllerId: Option[Int] = {
+    val snapshot = metadataSnapshot
+    snapshot.controllerId.orElse(snapshot.kraftControllerId)
+  }
+
+  /**
+   * Return controller id. Additionally returns if the controller is zk or 
kraft controller.
+   * Returns true for zk controller, false otherwise.
+   * @return
+   */
+  def getZkOrKRaftControllerId: (Option[Int], Boolean) = {
+    val snapshot = metadataSnapshot
+    snapshot.kraftControllerId match {
+      case Some(controller) => (Some(controller), false)
+      case None => (snapshot.controllerId, true)
+    }
+  }
+
+  override def getControllerIdForExternalClient: Option[Int] = {

Review Comment:
   Can you add a javadoc here explaining the behavior (i.e., random alive 
broker)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to