cmccabe commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1042763929
##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -164,25 +184,63 @@ class BrokerToControllerChannelManagerImpl(
private val logContext = new LogContext(s"[BrokerToControllerChannelManager
broker=${config.brokerId} name=$channelName] ")
private val manualMetadataUpdater = new ManualMetadataUpdater()
private val apiVersions = new ApiVersions()
- private val requestThread = newRequestThread
+ @volatile private var isZkControllerThread = true
+ @volatile private var requestThread = newRequestThread()
+ @volatile private var shutdownStarted = false
def start(): Unit = {
requestThread.start()
+ maybeScheduleReinitializeRequestThread()
}
def shutdown(): Unit = {
requestThread.shutdown()
+ shutdownStarted = false
info(s"Broker to controller channel manager for $channelName shutdown")
}
- private[server] def newRequestThread = {
+ def maybeScheduleReinitializeRequestThread(): Unit = {
+ // If migration is enabled for zkBroker, then we might see controller
change from zk to kraft
+ // and vice-versa. This periodic task takes care of setting the right
channel when such
+ // controller change is noticed.
+ if (config.migrationEnabled && config.requiresZookeeper) {
Review Comment:
Shouldn't this just be something we check in the `InterBrokerSendThread`
itself? Having a separate thread to do this seems very messy.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]