akhileshchg commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1042776473
##########
clients/src/main/resources/common/message/EnvelopeRequest.json:
##########
@@ -16,10 +16,11 @@
{
"apiKey": 58,
"type": "request",
- "listeners": ["controller"],
+ "listeners": ["controller", "zkBroker"],
"name": "EnvelopeRequest",
// Request struct for forwarding.
- "validVersions": "0",
+ // Version 1 adds the envelope request support to ZK brokers as well.
+ "validVersions": "0-1",
Review Comment:
That makes sense. Let me revert the version to 0.
##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -37,42 +38,55 @@ import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
+case class ControllerInformation(node: Option[Node],
+ listenerName: ListenerName,
+ securityProtocol: SecurityProtocol,
+ saslMechanism: String,
+ isZkController: Boolean)
+
trait ControllerNodeProvider {
def get(): Option[Node]
- def listenerName: ListenerName
- def securityProtocol: SecurityProtocol
- def saslMechanism: String
-}
-
-object MetadataCacheControllerNodeProvider {
- def apply(
- config: KafkaConfig,
- metadataCache: kafka.server.MetadataCache
- ): MetadataCacheControllerNodeProvider = {
- val listenerName = config.controlPlaneListenerName
- .getOrElse(config.interBrokerListenerName)
-
- val securityProtocol = config.controlPlaneSecurityProtocol
- .getOrElse(config.interBrokerSecurityProtocol)
-
- new MetadataCacheControllerNodeProvider(
- metadataCache,
- listenerName,
- securityProtocol,
- config.saslMechanismInterBrokerProtocol
- )
- }
+ def getControllerInfo(): ControllerInformation
}
class MetadataCacheControllerNodeProvider(
- val metadataCache: kafka.server.MetadataCache,
- val listenerName: ListenerName,
- val securityProtocol: SecurityProtocol,
- val saslMechanism: String
+ val metadataCache: ZkMetadataCache,
+ val config: KafkaConfig
) extends ControllerNodeProvider {
+
+ def listenerName(isZkController: Boolean): ListenerName = {
Review Comment:
Whether this is part of the metadata cache or node provider, we still need
to check for listername, security protocol and SASLMechanism needed to
communicate with the controller based on if we're trying to connect to
zkController or kraftController. I am not sure I completely understood your
comment.
##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -164,25 +184,63 @@ class BrokerToControllerChannelManagerImpl(
private val logContext = new LogContext(s"[BrokerToControllerChannelManager
broker=${config.brokerId} name=$channelName] ")
private val manualMetadataUpdater = new ManualMetadataUpdater()
private val apiVersions = new ApiVersions()
- private val requestThread = newRequestThread
+ @volatile private var isZkControllerThread = true
+ @volatile private var requestThread = newRequestThread()
+ @volatile private var shutdownStarted = false
def start(): Unit = {
requestThread.start()
+ maybeScheduleReinitializeRequestThread()
}
def shutdown(): Unit = {
requestThread.shutdown()
+ shutdownStarted = false
info(s"Broker to controller channel manager for $channelName shutdown")
}
- private[server] def newRequestThread = {
+ def maybeScheduleReinitializeRequestThread(): Unit = {
+ // If migration is enabled for zkBroker, then we might see controller
change from zk to kraft
+ // and vice-versa. This periodic task takes care of setting the right
channel when such
+ // controller change is noticed.
+ if (config.migrationEnabled && config.requiresZookeeper) {
Review Comment:
I tried to swap out just the networkclient in interBrokerSendThread. Let me
know how it looks now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]