Repository: kafka
Updated Branches:
  refs/heads/1.0.0 0222a35db -> 5dab884d7


KAFKA-5879; Controller should read the latest IsrChangeNotification znodes when 
handling IsrChangeNotification event

Author: Dong Lin <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]>

Closes #3840 from lindong28/KAFKA-5879

(cherry picked from commit b5b266eeeccc18c5cfbd37fa59ab5d9cf3676e34)
Signed-off-by: Jun Rao <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5dab884d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5dab884d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5dab884d

Branch: refs/heads/1.0.0
Commit: 5dab884d74183ef0409c8efab465d4223827a604
Parents: 0222a35
Author: Dong Lin <[email protected]>
Authored: Wed Oct 4 17:37:35 2017 -0700
Committer: Jun Rao <[email protected]>
Committed: Wed Oct 4 17:37:46 2017 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/controller/KafkaController.scala    | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5dab884d/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4ea61ab..811ff67 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -993,7 +993,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, time: Time, met
     // remove this partition from that list
     val updatedPartitionsBeingReassigned = partitionsBeingReassigned - 
topicAndPartition
     // write the new list to zookeeper
-    
zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
+    if (updatedPartitionsBeingReassigned.size < partitionsBeingReassigned.size)
+      
zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
     // update the cache. NO-OP if the partition's reassignment was never 
started
     controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
   }
@@ -1366,16 +1367,20 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, time: Time, met
     def state = ControllerState.IsrChange
 
     override def process(): Unit = {
+      // Read the current isr change notification znodes from ZK again instead 
of using sequenceNumbers
+      // to increase the odds of processing recent isr changes in a single 
ControllerEvent
+      // and to reduce the odds of trying to access znodes that have already 
been deleted (KAFKA-5879).
+      val currentSequenceNumbers = 
zkUtils.getChildrenParentMayNotExist(ZkUtils.IsrChangeNotificationPath)
       if (!isActive) return
       try {
-        val topicAndPartitions = 
sequenceNumbers.flatMap(getTopicAndPartition).toSet
+        val topicAndPartitions = 
currentSequenceNumbers.flatMap(getTopicAndPartition).toSet
         if (topicAndPartitions.nonEmpty) {
           updateLeaderAndIsrCache(topicAndPartitions)
           processUpdateNotifications(topicAndPartitions)
         }
       } finally {
         // delete the notifications
-        sequenceNumbers.map(x => 
controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath + "/" + 
x))
+        currentSequenceNumbers.map(x => 
controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath + "/" + 
x))
       }
     }
 

Reply via email to