hachikuji commented on code in PR #12776:
URL: https://github.com/apache/kafka/pull/12776#discussion_r1017238973


##########
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##########
@@ -69,95 +64,54 @@ class KafkaRaftServer(
   private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
     RaftConfig.parseVoterConnections(config.quorumVoters))
 
-  private val raftManager = new KafkaRaftManager[ApiMessageAndVersion](
-    metaProps,
+  private val jointServer = new JointServer(
     config,
-    new MetadataRecordSerde,
-    KafkaRaftServer.MetadataPartition,
-    KafkaRaftServer.MetadataTopicId,
+    metaProps,
     time,
     metrics,
     threadNamePrefix,
-    controllerQuorumVotersFuture
+    controllerQuorumVotersFuture,
+    new StandardFaultHandlerFactory(),
   )
 
   private val broker: Option[BrokerServer] = if 
(config.processRoles.contains(BrokerRole)) {
-    val brokerMetrics = BrokerServerMetrics(metrics)
-    val fatalFaultHandler = new ProcessExitingFaultHandler()
-    val metadataLoadingFaultHandler = new LoggingFaultHandler("metadata 
loading",
-        () => brokerMetrics.metadataLoadErrorCount.getAndIncrement())
-    val metadataApplyingFaultHandler = new LoggingFaultHandler("metadata 
application",
-      () => brokerMetrics.metadataApplyErrorCount.getAndIncrement())
     Some(new BrokerServer(
-      config,
-      metaProps,
-      raftManager,
-      time,
-      metrics,
-      brokerMetrics,
-      threadNamePrefix,
+      jointServer,
       offlineDirs,
-      controllerQuorumVotersFuture,
-      fatalFaultHandler,
-      metadataLoadingFaultHandler,
-      metadataApplyingFaultHandler
     ))
   } else {
     None
   }
 
   private val controller: Option[ControllerServer] = if 
(config.processRoles.contains(ControllerRole)) {
-    val controllerMetrics = new 
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), time)
-    val metadataFaultHandler = new LoggingFaultHandler("controller metadata",
-      () => controllerMetrics.incrementMetadataErrorCount())
-    val fatalFaultHandler = new ProcessExitingFaultHandler()
     Some(new ControllerServer(
-      metaProps,
-      config,
-      raftManager,
-      time,
-      metrics,
-      controllerMetrics,
-      threadNamePrefix,
-      controllerQuorumVotersFuture,
+      jointServer,
       KafkaRaftServer.configSchema,
-      raftManager.apiVersions,
       bootstrapMetadata,
-      metadataFaultHandler,
-      fatalFaultHandler
     ))
   } else {
     None
   }
 
   override def startup(): Unit = {
     Mx4jLoader.maybeLoad()
-    // Note that we startup `RaftManager` first so that the controller and 
broker
-    // can register listeners during initialization.
-    raftManager.startup()
     controller.foreach(_.startup())
     broker.foreach(_.startup())
     AppInfoParser.registerAppInfo(Server.MetricsPrefix, 
config.brokerId.toString, metrics, time.milliseconds())
     info(KafkaBroker.STARTED_MESSAGE)
   }
 
   override def shutdown(): Unit = {
+    // The last component to be shut down will stop JointServer.
     broker.foreach(_.shutdown())
-    // The order of shutdown for `RaftManager` and `ControllerServer` is 
backwards

Review Comment:
   It looks like we lost this in the updated implementation? The socket server 
of the controller is shutdown before `JointServer`. By the time we get to 
`RaftManager` shutdown, the socket server is down and we cannot do graceful 
resignation from the quorum anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to