Repository: kafka
Updated Branches:
  refs/heads/trunk 640f3b05e -> 62fcaedd6


KAFKA-1355; Avoid sending all topic metadata on state changes. Reviewed by Neha 
Narkhede and Timothy Chen


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62fcaedd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62fcaedd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62fcaedd

Branch: refs/heads/trunk
Commit: 62fcaedd66c46d8c15e1fd1725badcd575b10849
Parents: 640f3b0
Author: Joel Koshy <[email protected]>
Authored: Mon Apr 7 16:24:14 2014 -0700
Committer: Joel Koshy <[email protected]>
Committed: Mon Apr 7 16:24:14 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/api/LeaderAndIsrRequest.scala   |  2 +
 .../controller/ControllerChannelManager.scala   | 55 +++++++++++---------
 .../kafka/controller/KafkaController.scala      |  8 +--
 .../kafka/controller/TopicDeletionManager.scala | 17 +++---
 .../src/main/scala/kafka/server/KafkaApis.scala | 34 +++++-------
 5 files changed, 62 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/62fcaedd/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 0311737..3e40817 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -32,6 +32,8 @@ import collection.Set
 object LeaderAndIsr {
   val initialLeaderEpoch: Int = 0
   val initialZKVersion: Int = 0
+  val NoLeader = -1
+  val LeaderDuringDelete = -2
 }
 
 case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: 
List[Int], var zkVersion: Int) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/62fcaedd/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index f17d976..f79c1dc 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -211,7 +211,8 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
       leaderAndIsrRequestMap(brokerId).put((topic, partition),
         PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
     }
-    
addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+    
addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
+                                       Set(TopicAndPartition(topic, 
partition)))
   }
 
   def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, 
partition: Int, deletePartition: Boolean,
@@ -232,34 +233,40 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
    *
    */
   def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
+                                         partitions: 
collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
                                          callback: (RequestOrResponse) => Unit 
= null) {
-    val partitionList = 
controllerContext.partitionLeadershipInfo.keySet.dropWhile(
-      p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
-    if(partitionList.size > 0) {
-      partitionList.foreach { partition =>
-        val leaderIsrAndControllerEpochOpt = 
controllerContext.partitionLeadershipInfo.get(partition)
-        leaderIsrAndControllerEpochOpt match {
-          case Some(leaderIsrAndControllerEpoch) =>
-            val replicas = 
controllerContext.partitionReplicaAssignment(partition).toSet
-            val partitionStateInfo = 
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
-            brokerIds.filter(b => b >= 0).foreach { brokerId =>
-              updateMetadataRequestMap.getOrElseUpdate(brokerId, new 
mutable.HashMap[TopicAndPartition, PartitionStateInfo])
-              updateMetadataRequestMap(brokerId).put(partition, 
partitionStateInfo)
-            }
-          case None =>
-            info("Leader not assigned yet for partition %s. Skip sending 
udpate metadata request".format(partition))
-        }
-      }
-    } else {
-      if(controllerContext.partitionLeadershipInfo.keySet.size > 0) {
-        // last set of topics are being deleted
-        controllerContext.partitionLeadershipInfo.foreach { case(partition, 
leaderIsrAndControllerEpoch) =>
+    def updateMetadataRequestMapFor(partition: TopicAndPartition, 
beingDeleted: Boolean) {
+      val leaderIsrAndControllerEpochOpt = 
controllerContext.partitionLeadershipInfo.get(partition)
+      leaderIsrAndControllerEpochOpt match {
+        case Some(leaderIsrAndControllerEpoch) =>
+          val replicas = 
controllerContext.partitionReplicaAssignment(partition).toSet
+          val partitionStateInfo = if (beingDeleted) {
+            val leaderAndIsr = new 
LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, 
leaderIsrAndControllerEpoch.leaderAndIsr.isr)
+            PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, 
leaderIsrAndControllerEpoch.controllerEpoch), replicas)
+          } else {
+            PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
+          }
           brokerIds.filter(b => b >= 0).foreach { brokerId =>
-            updateMetadataRequestMap.put(brokerId, new 
mutable.HashMap[TopicAndPartition, PartitionStateInfo])
+            updateMetadataRequestMap.getOrElseUpdate(brokerId, new 
mutable.HashMap[TopicAndPartition, PartitionStateInfo])
+            updateMetadataRequestMap(brokerId).put(partition, 
partitionStateInfo)
           }
-        }
+        case None =>
+          info("Leader not yet assigned for partition %s. Skip sending 
UpdateMetadataRequest.".format(partition))
       }
     }
+
+    val filteredPartitions = {
+      val givenPartitions = if (partitions.isEmpty)
+        controllerContext.partitionLeadershipInfo.keySet
+      else
+        partitions
+      if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
+        givenPartitions
+      else
+        givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
+    }
+    filteredPartitions.foreach(partition => 
updateMetadataRequestMapFor(partition, beingDeleted = false))
+    controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestMapFor(partition, beingDeleted = true))
   }
 
   def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/62fcaedd/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index fcabd0d..c8c02ce 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -552,7 +552,7 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
         info("Removed partition %s from the list of reassigned partitions in 
zookeeper".format(topicAndPartition))
         controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
         //12. After electing leader, the replicas and isr information changes, 
so resend the update metadata request to every broker
-        
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+        
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, 
Set(topicAndPartition))
         // signal delete topic thread if reassignment for some partitions 
belonging to topics being deleted just completed
         
deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
     }
@@ -933,9 +933,9 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
    * metadata requests
    * @param brokers The brokers that the update metadata request should be 
sent to
    */
-  def sendUpdateMetadataRequest(brokers: Seq[Int]) {
+  def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: 
Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
     brokerRequestBatch.newBatch()
-    brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers)
+    brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
     brokerRequestBatch.sendRequestsToBrokers(epoch, 
controllerContext.correlationId.getAndIncrement)
   }
 
@@ -967,7 +967,7 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
               "controller was elected with epoch %d. Aborting state change by 
this controller".format(controllerEpoch))
           if (leaderAndIsr.isr.contains(replicaId)) {
             // if the replica to be removed from the ISR is also the leader, 
set the new leader value to -1
-            val newLeader = if(replicaId == leaderAndIsr.leader) -1 else 
leaderAndIsr.leader
+            val newLeader = if (replicaId == leaderAndIsr.leader) 
LeaderAndIsr.NoLeader else leaderAndIsr.leader
             var newIsr = leaderAndIsr.isr.filter(b => b != replicaId)
 
             // if the replica to be removed from the ISR is the last surviving 
member of the ISR and unclean leader election

http://git-wip-us.apache.org/repos/asf/kafka/blob/62fcaedd/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 09f54ac..d29e556 100644
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -72,12 +72,13 @@ class TopicDeletionManager(controller: KafkaController,
   val controllerContext = controller.controllerContext
   val partitionStateMachine = controller.partitionStateMachine
   val replicaStateMachine = controller.replicaStateMachine
-  var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ 
initialTopicsToBeDeleted
+  val topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ 
initialTopicsToBeDeleted
+  val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = 
topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)
   val deleteLock = new ReentrantLock()
-  var topicsIneligibleForDeletion: mutable.Set[String] = 
mutable.Set.empty[String] ++
+  val topicsIneligibleForDeletion: mutable.Set[String] = 
mutable.Set.empty[String] ++
     (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted)
   val deleteTopicsCond = deleteLock.newCondition()
-  var deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
+  val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
   var deleteTopicsThread: DeleteTopicsThread = null
   val isDeleteTopicEnabled = controller.config.deleteTopicEnable
 
@@ -99,6 +100,7 @@ class TopicDeletionManager(controller: KafkaController,
     if(isDeleteTopicEnabled) {
       deleteTopicsThread.shutdown()
       topicsToBeDeleted.clear()
+      partitionsToBeDeleted.clear()
       topicsIneligibleForDeletion.clear()
     }
   }
@@ -112,6 +114,7 @@ class TopicDeletionManager(controller: KafkaController,
   def enqueueTopicsForDeletion(topics: Set[String]) {
     if(isDeleteTopicEnabled) {
       topicsToBeDeleted ++= topics
+      partitionsToBeDeleted ++= 
topics.flatMap(controllerContext.partitionsForTopic)
       resumeTopicDeletionThread()
     }
   }
@@ -264,6 +267,7 @@ class TopicDeletionManager(controller: KafkaController,
     partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, 
OfflinePartition)
     partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, 
NonExistentPartition)
     topicsToBeDeleted -= topic
+    partitionsToBeDeleted.retain(_.topic != topic)
     controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
     
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
     controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
@@ -277,7 +281,8 @@ class TopicDeletionManager(controller: KafkaController,
   private def onTopicDeletion(topics: Set[String]) {
     info("Topic deletion callback for %s".format(topics.mkString(",")))
     // send update metadata so that brokers stop serving data for topics to be 
deleted
-    
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+    val partitions = topics.flatMap(controllerContext.partitionsForTopic)
+    
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
 partitions)
     val partitionReplicaAssignmentByTopic = 
controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
     topics.foreach { topic =>
       
onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)
@@ -322,8 +327,8 @@ class TopicDeletionManager(controller: KafkaController,
   /**
    * This callback is invoked by the delete topic callback with the list of 
partitions for topics to be deleted
    * It does the following -
-   * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting 
down) with all partitions except those for
-   *    which the topics are being deleted. The brokers start rejecting all 
client requests with UnknownTopicOrPartitionException
+   * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting 
down) for partitions that are being
+   *    deleted. The brokers start rejecting all client requests with 
UnknownTopicOrPartitionException
    * 2. Move all replicas for the partitions to OfflineReplica state. This 
will send StopReplicaRequest to the replicas
    *    and LeaderAndIsrRequest to the leader with the shrunk ISR. When the 
leader replica itself is moved to OfflineReplica state,
    *    it will skip sending the LeaderAndIsrRequest since the leader will be 
updated to -1

http://git-wip-us.apache.org/repos/asf/kafka/blob/62fcaedd/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c068ef6..705d87e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -137,26 +137,20 @@ class KafkaApis(val requestChannel: RequestChannel,
       // cache the list of alive brokers in the cluster
       updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, 
b))
       updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
-        metadataCache.put(partitionState._1, partitionState._2)
-        stateChangeLogger.trace(("Broker %d cached leader info %s for 
partition %s in response to UpdateMetadata request " +
-          "sent by controller %d epoch %d with correlation id 
%d").format(brokerId, partitionState._2, partitionState._1,
-          updateMetadataRequest.controllerId, 
updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
-      }
-      // remove the topics that don't exist in the UpdateMetadata request 
since those are the topics that are
-      // currently being deleted by the controller
-      val topicsKnownToThisBroker = metadataCache.map {
-        case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic 
}.toSet
-      val topicsKnownToTheController = 
updateMetadataRequest.partitionStateInfos.map {
-        case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic 
}.toSet
-      val deletedTopics = topicsKnownToThisBroker -- topicsKnownToTheController
-      val partitionsToBeDeleted = metadataCache.filter {
-        case(topicAndPartition, partitionStateInfo) => 
deletedTopics.contains(topicAndPartition.topic)
-      }.keySet
-      partitionsToBeDeleted.foreach { partition =>
-        metadataCache.remove(partition)
-        stateChangeLogger.trace(("Broker %d deleted partition %s from metadata 
cache in response to UpdateMetadata request " +
-          "sent by controller %d epoch %d with correlation id 
%d").format(brokerId, partition,
-          updateMetadataRequest.controllerId, 
updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+        if (partitionState._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader 
== LeaderAndIsr.LeaderDuringDelete) {
+          val partition = partitionState._1
+          metadataCache.remove(partition)
+          stateChangeLogger.trace(("Broker %d deleted partition %s from 
metadata cache in response to UpdateMetadata request " +
+                                   "sent by controller %d epoch %d with 
correlation id %d")
+                                   .format(brokerId, partition, 
updateMetadataRequest.controllerId,
+                                           
updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+        } else {
+          metadataCache.put(partitionState._1, partitionState._2)
+          stateChangeLogger.trace(("Broker %d cached leader info %s for 
partition %s in response to UpdateMetadata request " +
+                                   "sent by controller %d epoch %d with 
correlation id %d")
+                                   .format(brokerId, partitionState._2, 
partitionState._1, updateMetadataRequest.controllerId,
+                                           
updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+        }
       }
     }
     val updateMetadataResponse = new 
UpdateMetadataResponse(updateMetadataRequest.correlationId)

Reply via email to