This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8df96a4 MINOR: Reduce ZK reads and ensure ZK watch is set for listener update (#4670) 8df96a4 is described below commit 8df96a4119a5d46372eecdceb916f80dd073338a Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Sat Mar 10 15:57:05 2018 +0000 MINOR: Reduce ZK reads and ensure ZK watch is set for listener update (#4670) Ensures that ZK watch is set for each live broker for listener update notifications in the controller. Also avoids reading all brokers from ZooKeeper when a broker metadata is modified by passing in brokerId to BrokerModifications and reading only the updated broker. The existing listener update test verifies both these changes. Earlier, the test did not detect missing watch for the last broker since metadata of all brokers were read from ZK (adding a watch for all) when any broker was updated. Reviewers: Jun Rao <jun...@gmail.com> --- .../scala/kafka/controller/KafkaController.scala | 28 +++++++++------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a8707ad..ed2fb90 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -376,7 +376,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti debug(s"Register BrokerModifications handler for $brokerIds") brokerIds.foreach { brokerId => val brokerModificationsHandler = new BrokerModificationsHandler(this, eventManager, brokerId) - zkClient.registerZNodeChangeHandler(brokerModificationsHandler) + zkClient.registerZNodeChangeHandlerAndCheckExistence(brokerModificationsHandler) brokerModificationsHandlers.put(brokerId, brokerModificationsHandler) } } @@ -404,8 +404,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti unregisterBrokerModificationsHandler(deadBrokers) } - private def onBrokerUpdate(updatedBrokers: Seq[Int]) { - info(s"Broker info update callback for ${updatedBrokers.mkString(",")}") + private def onBrokerUpdate(updatedBrokerId: Int) { + info(s"Broker info update callback for $updatedBrokerId") sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) } @@ -1244,25 +1244,19 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } } - case object BrokerModifications extends ControllerEvent { + case class BrokerModifications(brokerId: Int) extends ControllerEvent { override def state: ControllerState = ControllerState.BrokerChange override def process(): Unit = { if (!isActive) return - val curBrokers = zkClient.getAllBrokersInCluster.toSet - val updatedBrokers = controllerContext.liveBrokers.filter { broker => - val existingBroker = curBrokers.find(_.id == broker.id) - existingBroker match { - case Some(b) => broker.endPoints != b.endPoints - case None => false - } - } - if (updatedBrokers.nonEmpty) { - val updatedBrokerIdsSorted = updatedBrokers.map(_.id).toSeq.sorted - info(s"Updated brokers: $updatedBrokers") + val newMetadata = zkClient.getBroker(brokerId) + val oldMetadata = controllerContext.liveBrokers.find(_.id == brokerId) + if (newMetadata.nonEmpty && oldMetadata.nonEmpty && newMetadata.map(_.endPoints) != oldMetadata.map(_.endPoints)) { + info(s"Updated broker: ${newMetadata.get}") + val curBrokers = controllerContext.liveBrokers -- oldMetadata ++ newMetadata controllerContext.liveBrokers = curBrokers // Update broker metadata - onBrokerUpdate(updatedBrokerIdsSorted) + onBrokerUpdate(brokerId) } } } @@ -1525,7 +1519,7 @@ class BrokerModificationsHandler(controller: KafkaController, eventManager: Cont override val path: String = BrokerIdZNode.path(brokerId) override def handleDataChange(): Unit = { - eventManager.put(controller.BrokerModifications) + eventManager.put(controller.BrokerModifications(brokerId)) } } -- To stop receiving notification emails like this one, please contact rsiva...@apache.org.