mumrah commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1042322800
##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -37,42 +38,53 @@ import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
+case class ControllerInformation(node: Option[Node], listenerName:
ListenerName,
Review Comment:
style nit: one param per line
##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -164,25 +182,54 @@ class BrokerToControllerChannelManagerImpl(
private val logContext = new LogContext(s"[BrokerToControllerChannelManager
broker=${config.brokerId} name=$channelName] ")
private val manualMetadataUpdater = new ManualMetadataUpdater()
private val apiVersions = new ApiVersions()
- private val requestThread = newRequestThread
+ private var isZkControllerThread = true
+ @volatile private var requestThread = newRequestThread
def start(): Unit = {
requestThread.start()
+ // If migration is enabled for zkBroker, then we might see controller
change from zk to kraft
+ // and vice-versa. This periodic task takes care of setting the right
channel when such
+ // controller change is noticed.
+ if (config.migrationEnabled && config.requiresZookeeper) {
+ kafkaScheduler.schedule("maybeReinitializeRequestThread",
+ () => maybeReinitializeRequestThread(), 0L, 500, TimeUnit.MILLISECONDS)
+ }
}
def shutdown(): Unit = {
requestThread.shutdown()
info(s"Broker to controller channel manager for $channelName shutdown")
}
+ def maybeReinitializeRequestThread(): Unit = {
+ val controllerInfo = controllerNodeProvider.getControllerInfo()
+ if (isZkControllerThread == controllerInfo.isZkController)
+ return
+ val oldRequestThread = requestThread
+ // Close the old thread and start a new one if old one is not already in
the process of
+ // shutting down.
+ if (oldRequestThread.isRunning) {
Review Comment:
If the old thread is not running, what would that mean here?
##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -544,6 +549,7 @@ class KafkaServer(
private def controlledShutdown(): Unit = {
val socketTimeoutMs = config.controllerSocketTimeoutMs
+ // TODO: Handle controlled shutdown for zkBroker when we have KRaft
controller.
Review Comment:
I just filed https://issues.apache.org/jira/browse/KAFKA-14447, can you add
a comment here
##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -1665,6 +1665,7 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty
val migrationEnabled: Boolean = getBoolean(KafkaConfig.MigrationEnabledProp)
+ def enableZkApiForwarding: Boolean = migrationEnabled &&
interBrokerProtocolVersion.isApiForwardingSupported
Review Comment:
Good question @dengziming. We said in the KIP that forwarding is enabled in
all cases with 3.4-IV0, though technically we only need to enable it if we're
doing a migration. It might be safer to keep forwarding off when we haven't
enabled the migration. Since we won't start the migration until all brokers
come up with `zookeeper.metadata.migration.enable`, I think leaving it off like
this will be okay.
##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -164,25 +182,54 @@ class BrokerToControllerChannelManagerImpl(
private val logContext = new LogContext(s"[BrokerToControllerChannelManager
broker=${config.brokerId} name=$channelName] ")
private val manualMetadataUpdater = new ManualMetadataUpdater()
private val apiVersions = new ApiVersions()
- private val requestThread = newRequestThread
+ private var isZkControllerThread = true
+ @volatile private var requestThread = newRequestThread
def start(): Unit = {
requestThread.start()
+ // If migration is enabled for zkBroker, then we might see controller
change from zk to kraft
+ // and vice-versa. This periodic task takes care of setting the right
channel when such
+ // controller change is noticed.
+ if (config.migrationEnabled && config.requiresZookeeper) {
+ kafkaScheduler.schedule("maybeReinitializeRequestThread",
+ () => maybeReinitializeRequestThread(), 0L, 500, TimeUnit.MILLISECONDS)
+ }
}
def shutdown(): Unit = {
requestThread.shutdown()
info(s"Broker to controller channel manager for $channelName shutdown")
}
+ def maybeReinitializeRequestThread(): Unit = {
+ val controllerInfo = controllerNodeProvider.getControllerInfo()
+ if (isZkControllerThread == controllerInfo.isZkController)
+ return
+ val oldRequestThread = requestThread
+ // Close the old thread and start a new one if old one is not already in
the process of
+ // shutting down.
+ if (oldRequestThread.isRunning) {
+ val newThread = newRequestThread
+ newThread.start()
+ requestThread = newThread
+ // Shutdown the request thread. This fails the inflight requests to be
failed with
+ // disconnection exception.
+ oldRequestThread.shutdown()
+ // There can still be unsent requests in the queue which need to be
cleaned.
+ oldRequestThread.failAllUnsentRequests()
+ }
+ }
+
private[server] def newRequestThread = {
Review Comment:
Can we pass the `ControllerInfo` in here instead of re-reading it?
##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -164,25 +182,54 @@ class BrokerToControllerChannelManagerImpl(
private val logContext = new LogContext(s"[BrokerToControllerChannelManager
broker=${config.brokerId} name=$channelName] ")
private val manualMetadataUpdater = new ManualMetadataUpdater()
private val apiVersions = new ApiVersions()
- private val requestThread = newRequestThread
+ private var isZkControllerThread = true
Review Comment:
Should this be volatile as well?
##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -243,6 +243,10 @@ public boolean isNoOpRecordSupported() {
return this.isAtLeast(IBP_3_3_IV1);
}
+ public boolean isApiForwardingSupported() {
Review Comment:
nit: Should we say "isApiForwardingEnabled" since we require API forwarding
at this version?
##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -315,7 +324,35 @@ class ZkMetadataCache(brokerId: Int, metadataVersion:
MetadataVersion, brokerFea
}.getOrElse(Map.empty[Int, Node])
}
- def getControllerId: Option[Int] = metadataSnapshot.controllerId
+ def getControllerId: Option[Int] = {
+ val snapshot = metadataSnapshot
+ snapshot.controllerId.orElse(snapshot.kraftControllerId)
+ }
+
+ /**
+ * Return controller id. Additionally returns if the controller is zk or
kraft controller.
+ * Returns true for zk controller, false otherwise.
+ * @return
+ */
+ def getZkOrKRaftControllerId: (Option[Int], Boolean) = {
+ val snapshot = metadataSnapshot
+ snapshot.kraftControllerId match {
+ case Some(controller) => (Some(controller), false)
+ case None => (snapshot.controllerId, true)
+ }
+ }
+
+ override def getControllerIdForExternalClient: Option[Int] = {
Review Comment:
Can you add a javadoc here explaining the behavior (i.e., random alive
broker)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]