This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8df96a4  MINOR: Reduce ZK reads and ensure ZK watch is set for 
listener update (#4670)
8df96a4 is described below

commit 8df96a4119a5d46372eecdceb916f80dd073338a
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Sat Mar 10 15:57:05 2018 +0000

    MINOR: Reduce ZK reads and ensure ZK watch is set for listener update 
(#4670)
    
    Ensures that ZK watch is set for each live broker for listener update 
notifications in the controller. Also avoids reading all brokers from ZooKeeper 
when a broker metadata is modified by passing in brokerId to 
BrokerModifications and reading only the updated broker.
    
    The existing listener update test verifies both these changes. Earlier, the 
test did not detect missing watch for the last broker since metadata of all 
brokers were read from ZK (adding a watch for all) when any broker was updated.
    
    Reviewers: Jun Rao <jun...@gmail.com>
---
 .../scala/kafka/controller/KafkaController.scala   | 28 +++++++++-------------
 1 file changed, 11 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index a8707ad..ed2fb90 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -376,7 +376,7 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
     debug(s"Register BrokerModifications handler for $brokerIds")
     brokerIds.foreach { brokerId =>
       val brokerModificationsHandler = new BrokerModificationsHandler(this, 
eventManager, brokerId)
-      zkClient.registerZNodeChangeHandler(brokerModificationsHandler)
+      
zkClient.registerZNodeChangeHandlerAndCheckExistence(brokerModificationsHandler)
       brokerModificationsHandlers.put(brokerId, brokerModificationsHandler)
     }
   }
@@ -404,8 +404,8 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
     unregisterBrokerModificationsHandler(deadBrokers)
   }
 
-  private def onBrokerUpdate(updatedBrokers: Seq[Int]) {
-    info(s"Broker info update callback for ${updatedBrokers.mkString(",")}")
+  private def onBrokerUpdate(updatedBrokerId: Int) {
+    info(s"Broker info update callback for $updatedBrokerId")
     
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
   }
 
@@ -1244,25 +1244,19 @@ class KafkaController(val config: KafkaConfig, 
zkClient: KafkaZkClient, time: Ti
     }
   }
 
-  case object BrokerModifications extends ControllerEvent {
+  case class BrokerModifications(brokerId: Int) extends ControllerEvent {
     override def state: ControllerState = ControllerState.BrokerChange
 
     override def process(): Unit = {
       if (!isActive) return
-      val curBrokers = zkClient.getAllBrokersInCluster.toSet
-      val updatedBrokers = controllerContext.liveBrokers.filter { broker =>
-        val existingBroker = curBrokers.find(_.id == broker.id)
-        existingBroker match {
-          case Some(b) => broker.endPoints != b.endPoints
-          case None => false
-        }
-      }
-      if (updatedBrokers.nonEmpty) {
-        val updatedBrokerIdsSorted = updatedBrokers.map(_.id).toSeq.sorted
-        info(s"Updated brokers: $updatedBrokers")
+      val newMetadata = zkClient.getBroker(brokerId)
+      val oldMetadata = controllerContext.liveBrokers.find(_.id == brokerId)
+      if (newMetadata.nonEmpty && oldMetadata.nonEmpty && 
newMetadata.map(_.endPoints) != oldMetadata.map(_.endPoints)) {
+        info(s"Updated broker: ${newMetadata.get}")
 
+        val curBrokers = controllerContext.liveBrokers -- oldMetadata ++ 
newMetadata
         controllerContext.liveBrokers = curBrokers // Update broker metadata
-        onBrokerUpdate(updatedBrokerIdsSorted)
+        onBrokerUpdate(brokerId)
       }
     }
   }
@@ -1525,7 +1519,7 @@ class BrokerModificationsHandler(controller: 
KafkaController, eventManager: Cont
   override val path: String = BrokerIdZNode.path(brokerId)
 
   override def handleDataChange(): Unit = {
-    eventManager.put(controller.BrokerModifications)
+    eventManager.put(controller.BrokerModifications(brokerId))
   }
 }
 

-- 
To stop receiving notification emails like this one, please contact
rsiva...@apache.org.

Reply via email to