[ 
https://issues.apache.org/jira/browse/KAFKA-3436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16300831#comment-16300831
 ] 

ASF GitHub Bot commented on KAFKA-3436:
---------------------------------------

ijuma closed pull request #1149: KAFKA-3436: Speed up controlled shutdown.
URL: https://github.com/apache/kafka/pull/1149
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 103f6cf575d..41e1d9d1dc0 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -17,32 +17,29 @@
 package kafka.controller
 
 import java.util
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
 
-import org.apache.kafka.common.errors.{BrokerNotAvailableException, 
ControllerMovedException}
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.{AbstractRequest, 
AbstractRequestResponse}
-
-import scala.collection._
 import com.yammer.metrics.core.Gauge
-import java.util.concurrent.TimeUnit
-import kafka.admin.AdminUtils
 import kafka.admin.PreferredReplicaLeaderElectionCommand
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common._
-import kafka.log.LogConfig
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
+import kafka.server._
+import kafka.utils.CoreUtils._
 import kafka.utils.ZkUtils._
 import kafka.utils._
-import kafka.utils.CoreUtils._
-import org.apache.zookeeper.Watcher.Event.KeeperState
+import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, 
IZkStateListener, ZkClient}
+import org.apache.kafka.common.errors.{BrokerNotAvailableException, 
ControllerMovedException}
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{AbstractRequest, 
AbstractRequestResponse}
 import org.apache.kafka.common.utils.Time
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, 
IZkStateListener, ZkClient, ZkConnection}
-import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
-import java.util.concurrent.locks.ReentrantLock
-import kafka.server._
-import kafka.common.TopicAndPartition
+import org.apache.zookeeper.Watcher.Event.KeeperState
+
+import scala.collection._
 
 class ControllerContext(val zkUtils: ZkUtils,
                         val zkSessionTimeout: Int) {
@@ -222,6 +219,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
     "id_%d-host_%s-port_%d".format(config.brokerId, 
controllerListener.get.host, controllerListener.get.port)
   }
 
+  def isValidController = zkUtils.getController == config.brokerId
+
   /**
    * On clean shutdown, the controller first determines the partitions that the
    * shutting down broker leads, and moves leadership of those partitions to 
another broker
@@ -254,43 +253,40 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
             .map(topicAndPartition => (topicAndPartition, 
controllerContext.partitionReplicaAssignment(topicAndPartition).size))
         }
 
-      allPartitionsAndReplicationFactorOnBroker.foreach {
-        case(topicAndPartition, replicationFactor) =>
-          // Move leadership serially to relinquish lock.
-          inLock(controllerContext.controllerLock) {
+      val leadersOnBroker = new mutable.HashSet[TopicAndPartition]
+      val followersOnBroker = new mutable.HashSet[PartitionAndReplica]
+      inLock(controllerContext.controllerLock) {
+        allPartitionsAndReplicationFactorOnBroker.foreach {
+          case (topicAndPartition, replicationFactor) =>
             
controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { 
currLeaderIsrAndControllerEpoch =>
               if (replicationFactor > 1) {
                 if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) 
{
-                  // If the broker leads the topic partition, transition the 
leader and update isr. Updates zk and
-                  // notifies all affected brokers
-                  
partitionStateMachine.handleStateChanges(Set(topicAndPartition), 
OnlinePartition,
-                    controlledShutdownPartitionLeaderSelector)
+                  leadersOnBroker += topicAndPartition
                 } else {
-                  // Stop the replica first. The state change below initiates 
ZK changes which should take some time
-                  // before which the stop replica request should be completed 
(in most cases)
-                  try {
-                    brokerRequestBatch.newBatch()
-                    
brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), 
topicAndPartition.topic,
-                      topicAndPartition.partition, deletePartition = false)
-                    brokerRequestBatch.sendRequestsToBrokers(epoch)
-                  } catch {
-                    case e : IllegalStateException => {
-                      // Resign if the controller is in an illegal state
-                      error("Forcing the controller to resign")
-                      brokerRequestBatch.clear()
-                      controllerElector.resign()
-
-                      throw e
-                    }
-                  }
-                  // If the broker is a follower, updates the isr in ZK and 
notifies the current leader
-                  
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
-                    topicAndPartition.partition, id)), OfflineReplica)
+                  followersOnBroker += 
PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, id)
                 }
               }
             }
-          }
+        }
+      }
+
+      // We group the change to 1000 partitions to avoid holding the 
controller lock for too long.
+      leadersOnBroker.grouped(1000).foreach { leaderGroup =>
+        inLock(controllerContext.controllerLock) {
+          // If the broker leads the topic partitions, transition all the 
leaders and update isr. Updates zk and
+          // notifies all affected brokers
+          partitionStateMachine.handleStateChanges(leaderGroup, 
OnlinePartition,
+            controlledShutdownPartitionLeaderSelector)
+        }
       }
+
+      followersOnBroker.grouped(1000).foreach { followerGroup =>
+        inLock(controllerContext.controllerLock) {
+          // If the broker is a follower of the partitions, updates the isr in 
ZK and notifies the current leaders
+          replicaStateMachine.handleStateChanges(followerGroup, OfflineReplica)
+        }
+      }
+
       def replicatedPartitionsBrokerLeads() = 
inLock(controllerContext.controllerLock) {
         trace("All leaders = " + 
controllerContext.partitionLeadershipInfo.mkString(","))
         controllerContext.partitionLeadershipInfo.filter {
@@ -821,6 +817,30 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
     controllerContext.controllerChannelManager.startup()
   }
 
+  /**
+   * The method reads leader and ISR info from zookeeper. It is used before 
invoking state changes to make sure the
+   * controller is using the up-to-date information.
+   */
+  def refreshLeaderAndIsrFromZk(topicAndPartitions: Set[TopicAndPartition]): 
Map[TopicAndPartition, ZkLeaderAndIsrReadResult] = {
+    val leaderAndIsrReadResults = 
ReplicationUtils.asyncGetLeaderIsrAndEpochForPartitions(zkUtils, 
topicAndPartitions)
+    leaderAndIsrReadResults.foreach { case (tap, leaderAndIsrReadResult) =>
+      leaderAndIsrReadResult.leaderIsrAndControllerEpochOpt match {
+        case Some(leaderIsrAndControllerEpoch) =>
+          if (leaderIsrAndControllerEpoch.controllerEpoch > epoch)
+            throw new IllegalStateException(s"Leader and isr path written by 
another controller. This probably" +
+              s"means the current controller with epoch $epoch went through a 
soft failure and another " +
+              s"controller was elected with epoch 
${leaderIsrAndControllerEpoch.controllerEpoch}.")
+        case None =>
+      }
+      leaderAndIsrReadResult.exceptionOpt match {
+        case Some(exception) =>
+          throw new KafkaException(s"Asychronously reading LeaderAndIsr 
information for $tap", exception)
+        case None =>
+      }
+    }
+    leaderAndIsrReadResults
+  }
+
   def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = 
controllerContext.partitionReplicaAssignment.keySet) {
     val leaderAndIsrInfo = 
zkUtils.getPartitionLeaderAndIsrForTopics(zkUtils.zkClient, topicAndPartitions)
     for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
@@ -1040,73 +1060,6 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
     }
   }
 
-  /**
-   * Removes a given partition replica from the ISR; if it is not the current
-   * leader and there are sufficient remaining replicas in ISR.
-   * @param topic topic
-   * @param partition partition
-   * @param replicaId replica Id
-   * @return the new leaderAndIsr (with the replica removed if it was present),
-   *         or None if leaderAndIsr is empty.
-   */
-  def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): 
Option[LeaderIsrAndControllerEpoch] = {
-    val topicAndPartition = TopicAndPartition(topic, partition)
-    debug("Removing replica %d from ISR %s for partition %s.".format(replicaId,
-      
controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.isr.mkString(","),
 topicAndPartition))
-    var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] 
= None
-    var zkWriteCompleteOrUnnecessary = false
-    while (!zkWriteCompleteOrUnnecessary) {
-      // refresh leader and isr from zookeeper again
-      val leaderIsrAndEpochOpt = 
ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
-      zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
-        case Some(leaderIsrAndEpoch) => // increment the leader epoch even if 
the ISR changes
-          val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
-          val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
-          if(controllerEpoch > epoch)
-            throw new StateChangeFailedException("Leader and isr path written 
by another controller. This probably" +
-              "means the current controller with epoch %d went through a soft 
failure and another ".format(epoch) +
-              "controller was elected with epoch %d. Aborting state change by 
this controller".format(controllerEpoch))
-          if (leaderAndIsr.isr.contains(replicaId)) {
-            // if the replica to be removed from the ISR is also the leader, 
set the new leader value to -1
-            val newLeader = if (replicaId == leaderAndIsr.leader) 
LeaderAndIsr.NoLeader else leaderAndIsr.leader
-            var newIsr = leaderAndIsr.isr.filter(b => b != replicaId)
-
-            // if the replica to be removed from the ISR is the last surviving 
member of the ISR and unclean leader election
-            // is disallowed for the corresponding topic, then we must 
preserve the ISR membership so that the replica can
-            // eventually be restored as the leader.
-            if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, 
AdminUtils.fetchEntityConfig(zkUtils,
-              ConfigType.Topic, 
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
-              info("Retaining last ISR %d of partition %s since unclean leader 
election is disabled".format(replicaId, topicAndPartition))
-              newIsr = leaderAndIsr.isr
-            }
-
-            val newLeaderAndIsr = new LeaderAndIsr(newLeader, 
leaderAndIsr.leaderEpoch + 1,
-              newIsr, leaderAndIsr.zkVersion + 1)
-            // update the new leadership decision in zookeeper or retry
-            val (updateSucceeded, newVersion) = 
ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
-              newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
-
-            newLeaderAndIsr.zkVersion = newVersion
-            finalLeaderIsrAndControllerEpoch = 
Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
-            controllerContext.partitionLeadershipInfo.put(topicAndPartition, 
finalLeaderIsrAndControllerEpoch.get)
-            if (updateSucceeded)
-              info("New leader and ISR for partition %s is 
%s".format(topicAndPartition, newLeaderAndIsr.toString()))
-            updateSucceeded
-          } else {
-            warn("Cannot remove replica %d from ISR of partition %s since it 
is not in the ISR. Leader = %d ; ISR = %s"
-                 .format(replicaId, topicAndPartition, leaderAndIsr.leader, 
leaderAndIsr.isr))
-            finalLeaderIsrAndControllerEpoch = 
Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
-            controllerContext.partitionLeadershipInfo.put(topicAndPartition, 
finalLeaderIsrAndControllerEpoch.get)
-            true
-          }
-        case None =>
-          warn("Cannot remove replica %d from ISR of %s - leaderAndIsr is 
empty.".format(replicaId, topicAndPartition))
-          true
-      }
-    }
-    finalLeaderIsrAndControllerEpoch
-  }
-
   /**
    * Does not change leader or isr, but just increments the leader epoch
    * @param topic topic
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ec03b84395b..180e58609f4 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -21,7 +21,7 @@ import collection.JavaConversions
 import collection.mutable.Buffer
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
-import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, 
StateChangeFailedException, NoReplicaOnlineException}
+import kafka.common.{KafkaException, LeaderElectionNotNeededException, 
TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
 import kafka.utils.{Logging, ReplicationUtils}
 import kafka.utils.ZkUtils._
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
@@ -47,6 +47,7 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
   private val zkUtils = controllerContext.zkUtils
   private val partitionState: mutable.Map[TopicAndPartition, PartitionState] = 
mutable.Map.empty
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
+  private val leaderAndIsrUpdateBatch = new ZkLeaderAndIsrUpdateBatch(zkUtils)
   private val hasStarted = new AtomicBoolean(false)
   private val noOpPartitionLeaderSelector = new 
NoOpLeaderSelector(controllerContext)
   private val topicChangeListener = new TopicChangeListener()
@@ -110,21 +111,12 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
    * state. This is called on a successful controller election and on broker 
changes
    */
   def triggerOnlinePartitionStateChange() {
-    try {
-      brokerRequestBatch.newBatch()
-      // try to move all partitions in NewPartition or OfflinePartition state 
to OnlinePartition state except partitions
-      // that belong to topics to be deleted
-      for((topicAndPartition, partitionState) <- partitionState
-          
if(!controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)))
 {
-        if(partitionState.equals(OfflinePartition) || 
partitionState.equals(NewPartition))
-          handleStateChange(topicAndPartition.topic, 
topicAndPartition.partition, OnlinePartition, 
controller.offlinePartitionSelector,
-                            (new CallbackBuilder).build)
-      }
-      brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
-    } catch {
-      case e: Throwable => error("Error while moving some partitions to the 
online state", e)
-      // TODO: It is not enough to bail out and log an error, it is important 
to trigger leader election for those partitions
-    }
+    val partitions = partitionState.filter { case (topicAndPartition, 
partitionState) =>
+      val isBeingDeleted = 
controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)
+      !isBeingDeleted && (partitionState.equals(OfflinePartition) || 
partitionState.equals(NewPartition))
+    }.keySet
+
+    handleStateChanges(partitions, OnlinePartition, 
controller.offlinePartitionSelector, (new CallbackBuilder).build)
   }
 
   def partitionsInState(state: PartitionState): Set[TopicAndPartition] = {
@@ -141,11 +133,37 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
                          callbacks: Callbacks = (new CallbackBuilder).build) {
     info("Invoking state change to %s for partitions %s".format(targetState, 
partitions.mkString(",")))
     try {
-      brokerRequestBatch.newBatch()
-      partitions.foreach { topicAndPartition =>
-        handleStateChange(topicAndPartition.topic, 
topicAndPartition.partition, targetState, leaderSelector, callbacks)
-      }
-      brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
+      val remainingParititions = collection.mutable.Set(partitions.toArray:_*)
+      do {
+        // First refresh all the LeaderAndIsr info from zookeeper
+        // Make sure all the LeaderAndIsr information are successfully read 
and this is the valid controller.
+        val leaderAndIsrReadResults = 
controller.refreshLeaderAndIsrFromZk(remainingParititions)
+        trace(s"Refreshed LeaderAndIsr information for partitions: 
${leaderAndIsrReadResults}")
+        leaderAndIsrUpdateBatch.newBatch()
+        brokerRequestBatch.newBatch()
+        partitions.foreach { topicAndPartition =>
+          val leaderIsrAndControllerEpoch = 
leaderAndIsrReadResults(topicAndPartition).leaderIsrAndControllerEpochOpt
+          handleStateChange(topicAndPartition.topic, 
topicAndPartition.partition, leaderIsrAndControllerEpoch,
+            targetState, leaderSelector, callbacks)
+        }
+        // Make sure we update zk first because the callbacks will fill in the 
BrokerRequestBatch
+        try {
+          
leaderAndIsrUpdateBatch.writeLeaderAndIsrUpdateToZk(controller.epoch, Some(() 
=> controller.isValidController))
+        } catch {
+          // the exception thrown here is likely because of the zk 
disconnection or session expiration. In this case
+          // we swallow the error and retry. For the LeaderAndIsr updates that 
has been sent but not acked yet, they
+          // might be updated again, but that does not hurt.
+          case e: Throwable => error("Error while updating LeaderAndIsr in 
zookeeper.", e)
+        }
+        brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
+        
remainingParititions.retain(leaderAndIsrUpdateBatch.containsPartition(_))
+        if (!remainingParititions.isEmpty) {
+          debug(s"The following partitions are still waiting for state change: 
$remainingParititions.")
+          if (!controller.isValidController)
+            throw new StateChangeFailedException(s"Controller 
${controller.config.brokerId} epoch ${controller.epoch}" +
+              s" is no longer the valid controller.")
+        }
+      } while (!remainingParititions.isEmpty)
     }catch {
       case e: Throwable => error("Error while moving some partitions to %s 
state".format(targetState), e)
       // TODO: It is not enough to bail out and log an error, it is important 
to trigger state changes for those partitions
@@ -175,7 +193,10 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
    * @param partition   The partition for which the state transition is invoked
    * @param targetState The end state that the partition should be moved to
    */
-  private def handleStateChange(topic: String, partition: Int, targetState: 
PartitionState,
+  private def handleStateChange(topic: String,
+                                partition: Int,
+                                leaderIsrAndControllerEpoch: 
Option[LeaderIsrAndControllerEpoch],
+                                targetState: PartitionState,
                                 leaderSelector: PartitionLeaderSelector,
                                 callbacks: Callbacks) {
     val topicAndPartition = TopicAndPartition(topic, partition)
@@ -201,16 +222,17 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
             case NewPartition =>
               // initialize leader and isr path for new partition
               initializeLeaderAndIsrForPartition(topicAndPartition)
+              partitionState.put(topicAndPartition, OnlinePartition)
+              val leader = 
controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
+              stateChangeLogger.trace("Controller %d epoch %d changed 
partition %s from %s to %s with leader %d"
+                .format(controllerId, controller.epoch, topicAndPartition, 
currState, targetState, leader))
             case OfflinePartition =>
-              electLeaderForPartition(topic, partition, leaderSelector)
+              electLeaderForPartition(topic, partition, OfflinePartition, 
leaderIsrAndControllerEpoch, leaderSelector)
             case OnlinePartition => // invoked when the leader needs to be 
re-elected
-              electLeaderForPartition(topic, partition, leaderSelector)
+              electLeaderForPartition(topic, partition, OnlinePartition, 
leaderIsrAndControllerEpoch, leaderSelector)
             case _ => // should never come here since illegal previous states 
are checked above
           }
-          partitionState.put(topicAndPartition, OnlinePartition)
-          val leader = 
controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
-          stateChangeLogger.trace("Controller %d epoch %d changed partition %s 
from %s to %s with leader %d"
-                                    .format(controllerId, controller.epoch, 
topicAndPartition, currState, targetState, leader))
+
            // post: partition has a leader
         case OfflinePartition =>
           // pre: partition should be in New or Online state
@@ -316,49 +338,57 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
   /**
    * Invoked on the OfflinePartition,OnlinePartition->OnlinePartition state 
change.
    * It invokes the leader election API to elect a leader for the input 
offline partition
-   * @param topic               The topic of the offline partition
-   * @param partition           The offline partition
-   * @param leaderSelector      Specific leader selector (e.g., 
offline/reassigned/etc.)
+   * @param topic                                 The topic of the offline 
partition
+   * @param partition                             The offline partition
+   * @param currState                             The current state of the 
partition.
+   * @param leaderIsrAndControllerEpochOpt        The current leader and isr 
information in zookeeper
+   * @param leaderSelector                        Specific leader selector 
(e.g., offline/reassigned/etc.)
    */
-  def electLeaderForPartition(topic: String, partition: Int, leaderSelector: 
PartitionLeaderSelector) {
+  def electLeaderForPartition(topic: String,
+                              partition: Int,
+                              currState: PartitionState,
+                              leaderIsrAndControllerEpochOpt: 
Option[LeaderIsrAndControllerEpoch],
+                              leaderSelector: PartitionLeaderSelector) {
     val topicAndPartition = TopicAndPartition(topic, partition)
     // handle leader election for the partitions whose leader is no longer 
alive
     stateChangeLogger.trace("Controller %d epoch %d started leader election 
for partition %s"
                               .format(controllerId, controller.epoch, 
topicAndPartition))
     try {
-      var zookeeperPathUpdateSucceeded: Boolean = false
-      var newLeaderAndIsr: LeaderAndIsr = null
-      var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
-      while(!zookeeperPathUpdateSucceeded) {
-        val currentLeaderIsrAndEpoch = 
getLeaderIsrAndEpochOrThrowException(topic, partition)
-        val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
-        val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
-        if (controllerEpoch > controller.epoch) {
-          val failMsg = ("aborted leader election for partition [%s,%d] since 
the LeaderAndIsr path was " +
-                         "already written by another controller. This probably 
means that the current controller %d went through " +
-                         "a soft failure and another controller was elected 
with epoch %d.")
-                           .format(topic, partition, controllerId, 
controllerEpoch)
-          stateChangeLogger.error("Controller %d epoch %d 
".format(controllerId, controller.epoch) + failMsg)
-          throw new StateChangeFailedException(failMsg)
-        }
-        // elect new leader or throw exception
-        val (leaderAndIsr, replicas) = 
leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
-        val (updateSucceeded, newVersion) = 
ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
-          leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
-        newLeaderAndIsr = leaderAndIsr
-        newLeaderAndIsr.zkVersion = newVersion
-        zookeeperPathUpdateSucceeded = updateSucceeded
-        replicasForThisPartition = replicas
+
+      val currLeaderIsrAndEpoch = leaderIsrAndControllerEpochOpt.getOrElse({
+        val failMsg = "LeaderAndIsr information doesn't exist for partition %s 
in %s state"
+          .format(topicAndPartition, partitionState(topicAndPartition))
+        throw new StateChangeFailedException(failMsg)
+      })
+      val currLeaderAndIsr = currLeaderIsrAndEpoch.leaderAndIsr
+      // elect new leader or throw exception
+      val (newLeaderAndIsr, replicas) = 
leaderSelector.selectLeader(topicAndPartition, currLeaderAndIsr)
+      leaderAndIsrUpdateBatch.addLeaderAndIsrUpdate(topicAndPartition,
+                                                    newLeaderAndIsr,
+                                                    currLeaderAndIsr.zkVersion,
+                                                    
onLeaderAndIsrUpdateSuccess)
+
+      def onLeaderAndIsrUpdateSuccess(topicAndPartition: TopicAndPartition,
+                                      leaderAndIsrUpdateResult: 
ZkLeaderAndIsrUpdateResult) = {
+        val newLeaderAndIsr = leaderAndIsrUpdateResult.leaderAndIsr
+        val newZkVersion = leaderAndIsrUpdateResult.newZkVersion
+        updateLocalState(newLeaderAndIsr, newZkVersion)
+      }
+
+      def updateLocalState(newLeaderAndIsr: LeaderAndIsr, newZkVersion: Int) = 
{
+        val newLeaderIsrAndControllerEpoch = new 
LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
+        // update the leader cache
+        controllerContext.partitionLeadershipInfo.put(topicAndPartition, 
newLeaderIsrAndControllerEpoch)
+        debug(s"After leader election, leader cache is updated to " +
+          s"${controllerContext.partitionLeadershipInfo.map(l => (l._1, 
l._2))}")
+        val assignedReplicas = 
controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, 
partition))
+        // store new leader and isr info in cache
+        brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicas, topic, 
partition,
+          newLeaderIsrAndControllerEpoch, assignedReplicas)
+        partitionState.put(topicAndPartition, OnlinePartition)
+        stateChangeLogger.trace(s"Controller $controllerId epoch 
${controller.epoch} changed partition $topicAndPartition " +
+          s"from $currState to $OnlinePartition with leader 
${newLeaderAndIsr.leader}")
       }
-      val newLeaderIsrAndControllerEpoch = new 
LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
-      // update the leader cache
-      controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, 
partition), newLeaderIsrAndControllerEpoch)
-      stateChangeLogger.trace("Controller %d epoch %d elected leader %d for 
Offline partition %s"
-                                .format(controllerId, controller.epoch, 
newLeaderAndIsr.leader, topicAndPartition))
-      val replicas = 
controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, 
partition))
-      // store new leader and isr info in cache
-      
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, 
topic, partition,
-        newLeaderIsrAndControllerEpoch, replicas)
     } catch {
       case lenne: LeaderElectionNotNeededException => // swallow
       case nroe: NoReplicaOnlineException => throw nroe
@@ -367,7 +397,6 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
         stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, 
controller.epoch) + failMsg)
         throw new StateChangeFailedException(failMsg, sce)
     }
-    debug("After leader election, leader cache is updated to 
%s".format(controllerContext.partitionLeadershipInfo.map(l => (l._1, l._2))))
   }
 
   private def registerTopicChangeListener() = {
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 2fd8b95506c..04a04c8cf0f 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -16,13 +16,17 @@
 */
 package kafka.controller
 
+import kafka.admin.AdminUtils
+import kafka.api.LeaderAndIsr
+import kafka.log.LogConfig
+import kafka.server.ConfigType
+
 import collection._
 import collection.JavaConversions._
 import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{TopicAndPartition, StateChangeFailedException}
+import kafka.common.{KafkaException, TopicAndPartition, 
StateChangeFailedException}
 import kafka.utils.{ZkUtils, ReplicationUtils, Logging}
 import org.I0Itec.zkclient.IZkChildListener
-import org.apache.log4j.Logger
 import kafka.controller.Callbacks._
 import kafka.utils.CoreUtils._
 
@@ -51,6 +55,7 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
   private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = 
mutable.Map.empty
   private val brokerChangeListener = new BrokerChangeListener()
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
+  private val leaderAndIsrUpdateBatch = new ZkLeaderAndIsrUpdateBatch(zkUtils)
   private val hasStarted = new AtomicBoolean(false)
   private val stateChangeLogger = KafkaController.stateChangeLogger
 
@@ -110,10 +115,43 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
     if(replicas.size > 0) {
       info("Invoking state change to %s for replicas %s".format(targetState, 
replicas.mkString(",")))
       try {
-        brokerRequestBatch.newBatch()
-        replicas.foreach(r => handleStateChange(r, targetState, callbacks))
-        brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
-      }catch {
+
+        val remainingTopicAndPartitions = collection.mutable.Set(replicas.map 
{ partitionAndReplica =>
+          new TopicAndPartition(partitionAndReplica.topic, 
partitionAndReplica.partition)
+        }.toArray:_*)
+        val remainingReplicas = collection.mutable.Set(replicas.toArray:_*)
+        do {
+          // First refresh all the LeaderAndIsr info from zookeeper
+          // Make sure all the LeaderAndIsr information are successfully read 
and this is the valid controller.
+          val leaderAndIsrReadResults = 
controller.refreshLeaderAndIsrFromZk(remainingTopicAndPartitions)
+          trace(s"Refreshed LeaderAndIsr information for replicas: 
${leaderAndIsrReadResults}")
+          leaderAndIsrUpdateBatch.newBatch()
+          brokerRequestBatch.newBatch()
+          remainingReplicas.foreach(r => {
+            val leaderIsrAndControllerEpochOpt =
+              leaderAndIsrReadResults(TopicAndPartition(r.topic, 
r.partition)).leaderIsrAndControllerEpochOpt
+            handleStateChange(r, leaderIsrAndControllerEpochOpt, targetState, 
callbacks)
+          })
+          try {
+            
leaderAndIsrUpdateBatch.writeLeaderAndIsrUpdateToZk(controller.epoch, Some(() 
=> controller.isValidController))
+          } catch {
+            // the exception thrown here is likely because of the zk 
disconnection or session expiration. In this case
+            // we swallow the error and retry. For the LeaderAndIsr updates 
that has been sent but not acked yet, they
+            // might be updated again, but that does not hurt.
+            case e: Throwable => error("Error while updating LeaderAndIsr in 
zookeeper.", e)
+          }
+          brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
+          
remainingTopicAndPartitions.retain(leaderAndIsrUpdateBatch.containsPartition(_))
+          remainingReplicas.retain(r => 
leaderAndIsrUpdateBatch.containsPartition(TopicAndPartition(r.topic, 
r.partition)))
+          if (!remainingReplicas.isEmpty) {
+            debug(s"The following replicas are still waiting for state change: 
$remainingReplicas.")
+            if (!controller.isValidController)
+              throw new StateChangeFailedException(s"Controller 
${controller.config.brokerId} epoch ${controller.epoch}" +
+                s" is no longer the valid controller.")
+          }
+        } while (!remainingReplicas.isEmpty)
+
+      } catch {
         case e: Throwable => error("Error while moving some replicas to %s 
state".format(targetState), e)
       }
     }
@@ -154,7 +192,9 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
    * @param partitionAndReplica The replica for which the state transition is 
invoked
    * @param targetState The end state that the replica should be moved to
    */
-  def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: 
ReplicaState,
+  def handleStateChange(partitionAndReplica: PartitionAndReplica,
+                        leaderIsrAndControllerEpochOpt: 
Option[LeaderIsrAndControllerEpoch],
+                        targetState: ReplicaState,
                         callbacks: Callbacks) {
     val topic = partitionAndReplica.topic
     val partition = partitionAndReplica.partition
@@ -171,7 +211,6 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
         case NewReplica =>
           assertValidPreviousStates(partitionAndReplica, 
List(NonExistentReplica), targetState)
           // start replica as a follower to the current leader for its 
partition
-          val leaderIsrAndControllerEpochOpt = 
ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
           leaderIsrAndControllerEpochOpt match {
             case Some(leaderIsrAndControllerEpoch) =>
               if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
@@ -244,31 +283,15 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
           // send stop replica command to the replica so that it stops 
fetching from the leader
           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), 
topic, partition, deletePartition = false)
           // As an optimization, the controller removes dead replicas from the 
ISR
-          val leaderAndIsrIsEmpty: Boolean =
-            controllerContext.partitionLeadershipInfo.get(topicAndPartition) 
match {
-              case Some(currLeaderIsrAndControllerEpoch) =>
-                controller.removeReplicaFromIsr(topic, partition, replicaId) 
match {
-                  case Some(updatedLeaderIsrAndControllerEpoch) =>
-                    // send the shrunk ISR state change request to all the 
remaining alive replicas of the partition.
-                    val currentAssignedReplicas = 
controllerContext.partitionReplicaAssignment(topicAndPartition)
-                    if 
(!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
-                      
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_
 == replicaId),
-                        topic, partition, updatedLeaderIsrAndControllerEpoch, 
replicaAssignment)
-                    }
-                    replicaState.put(partitionAndReplica, OfflineReplica)
-                    stateChangeLogger.trace("Controller %d epoch %d changed 
state of replica %d for partition %s from %s to %s"
-                      .format(controllerId, controller.epoch, replicaId, 
topicAndPartition, currState, targetState))
-                    false
-                  case None =>
-                    true
-                }
-              case None =>
-                true
-            }
-          if (leaderAndIsrIsEmpty && 
!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition))
-            throw new StateChangeFailedException(
-              "Failed to change state of replica %d for partition %s since the 
leader and isr path in zookeeper is empty"
-              .format(replicaId, topicAndPartition))
+          leaderIsrAndControllerEpochOpt match {
+            case Some(currLeaderIsrAndControllerEpoch) =>
+              removeReplicaFromIsr(topic, partition, replicaId, currState, 
currLeaderIsrAndControllerEpoch)
+            case None =>
+              if 
(!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition))
+                throw new StateChangeFailedException(
+                  "Failed to change state of replica %d for partition %s since 
the leader and isr path in zookeeper is empty"
+                    .format(replicaId, topicAndPartition))
+          }
       }
     }
     catch {
@@ -278,6 +301,89 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
     }
   }
 
+  /**
+   * This method asychronously remove the given replica from ISR of the 
partition. Typically controller is only
+   * responsible for updating the leader infromation but not the ISR 
information of a partition. But as an optimization
+   * we let the controller remove replicas from ISR when the replica states 
change to OFFLINE. In any case when
+   * controller successufully updated the ZK path it has to send a 
LeaderAndIsrRequest to the brokers, otherwise the
+   * broker will not be able to update the ZK path to chagne ISR. If there is 
no ZK path update from the controller,
+   * the controller does not need to send LeaderAndIsrRequest but can simply 
update its local ISR information.
+   *
+   * @param topic the topic
+   * @param partition the partition id
+   * @param replicaId the replica id
+   * @param currState the current state fot the replica
+   * @param leaderIsrAndEpochOpt The current LeaderAndIsr information. It is 
refreshed from zookeeper before the state
+   *                             change.
+   */
+  def removeReplicaFromIsr(topic: String,
+                           partition: Int,
+                           replicaId: Int,
+                           currState: ReplicaState,
+                           leaderIsrAndEpoch: LeaderIsrAndControllerEpoch) = {
+    val topicAndPartition = TopicAndPartition(topic, partition)
+    // In cases where states of multiple replicas of same partition are 
changed in the same batch, we need to update
+    // the LeaderAndIsr based on the previous pending state change instead of 
using the original LeaderAndIsr
+    // information from zookeeper.
+    val currLeaderAndIsr =
+      if (leaderAndIsrUpdateBatch.containsPartition(topicAndPartition))
+        
leaderAndIsrUpdateBatch.pendingLeaderAndIsrUpdate(topicAndPartition).newLeaderAndIsr
+      else
+        leaderIsrAndEpoch.leaderAndIsr
+    val currZkVersion = leaderIsrAndEpoch.leaderAndIsr.zkVersion
+    val currLeaderEpoch = leaderIsrAndEpoch.leaderAndIsr.leaderEpoch
+
+    debug(s"Removing replica ${replicaId} from ISR ${currLeaderAndIsr.isr}} 
for partition $topicAndPartition.")
+    if (currLeaderAndIsr.isr.contains(replicaId)) {
+      // The caller has already checked the ISR and it contains the replica Id 
to be removed.
+      val newLeader = if (replicaId == currLeaderAndIsr.leader) 
LeaderAndIsr.NoLeader else currLeaderAndIsr.leader
+      var newIsr = currLeaderAndIsr.isr.filter(b => b != replicaId)
+
+      // if the replica to be removed from the ISR is the last surviving 
member of the ISR and unclean leader election
+      // is disallowed for the corresponding topic, then we must preserve the 
ISR membership so that the replica can
+      // eventually be restored as the leader.
+      if (newIsr.isEmpty && !LogConfig.fromProps(controller.config.originals, 
AdminUtils.fetchEntityConfig(zkUtils,
+        ConfigType.Topic, 
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
+        info("Retaining last ISR %d of partition %s since unclean leader 
election is disabled".format(replicaId, topicAndPartition))
+        newIsr = currLeaderAndIsr.isr
+      }
+
+      val newLeaderAndIsr = new LeaderAndIsr(newLeader, currLeaderEpoch + 1, 
newIsr, currZkVersion + 1)
+      leaderAndIsrUpdateBatch.addLeaderAndIsrUpdate(topicAndPartition,
+                                                    newLeaderAndIsr,
+                                                    currZkVersion,
+                                                    
onUpdateLeaderAndIsrSuccess)
+      debug(s"Added new LeaderAndIsr update $newLeaderAndIsr for 
$topic-$partition")
+    } else {
+      warn(s"Cannot remove replica $replicaId from ISR of partition 
$topicAndPartition since it is not in " +
+        s"the ISR. Leader = ${currLeaderAndIsr.leader} ; ISR = 
${currLeaderAndIsr.isr}")
+        updateLocalState(LeaderIsrAndControllerEpoch(currLeaderAndIsr, 
controller.epoch))
+    }
+
+    // If the controller successfully updated zk path, update local info and 
send LeaderAndIsrRequest to
+    // the replicas of the partition.
+    def onUpdateLeaderAndIsrSuccess(topicAndPartition: TopicAndPartition,
+                                    leaderAndIsrUpdateResult: 
ZkLeaderAndIsrUpdateResult) = {
+      trace(s"Successfully updated LeaderAndIsr info for $topicAndPartition. 
New LeaderAndIsr is" +
+        s" ${leaderAndIsrUpdateResult.leaderAndIsr}")
+      val updatedLeaderAndIsr = leaderAndIsrUpdateResult.leaderAndIsr
+      val updatedLeaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controller.epoch)
+      updateLocalState(updatedLeaderIsrAndControllerEpoch)
+    }
+
+    def updateLocalState(updatedLeaderIsrAndControllerEpoch: 
LeaderIsrAndControllerEpoch) {
+      controllerContext.partitionLeadershipInfo.put(topicAndPartition, 
updatedLeaderIsrAndControllerEpoch)
+      val currentAssignedReplicas = 
controllerContext.partitionReplicaAssignment(topicAndPartition)
+      if 
(!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
+        
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_
 == replicaId),
+          topic, partition, updatedLeaderIsrAndControllerEpoch, 
currentAssignedReplicas)
+      }
+      replicaState.put(PartitionAndReplica(topic, partition, replicaId), 
OfflineReplica)
+      stateChangeLogger.trace("Controller %d epoch %d changed state of replica 
%d for partition %s from %s to %s"
+        .format(controllerId, controller.epoch, replicaId, topicAndPartition, 
currState, OfflineReplica))
+    }
+  }
+
   def areAllReplicasForTopicDeleted(topic: String): Boolean = {
     val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
     val replicaStatesForTopic = replicasForTopic.map(r => (r, 
replicaState(r))).toMap
diff --git 
a/core/src/main/scala/kafka/controller/ZkLeaderAndIsrUpdateBatch.scala 
b/core/src/main/scala/kafka/controller/ZkLeaderAndIsrUpdateBatch.scala
new file mode 100644
index 00000000000..7daa90368ce
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/ZkLeaderAndIsrUpdateBatch.scala
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.controller
+
+import java.util.concurrent.CountDownLatch
+
+import kafka.api.LeaderAndIsr
+import kafka.common.TopicAndPartition
+import kafka.utils.ReplicationUtils.trace
+import kafka.utils.{ReplicationUtils, ZkUtils}
+
+/**
+ * The class that batches the LeaderAndisrUpdate together.
+ */
+class ZkLeaderAndIsrUpdateBatch(zkUtils: ZkUtils) {
+  val leaderAndIsrUpdates = new collection.mutable.HashMap[TopicAndPartition, 
ZkLeaderAndIsrUpdate]
+
+  def newBatch() = leaderAndIsrUpdates.clear()
+
+  def addLeaderAndIsrUpdate(topicAndPartition: TopicAndPartition,
+                            newLeaderAndIsr: LeaderAndIsr,
+                            expectZkVersion: Int,
+                            onSuccess: (TopicAndPartition, 
ZkLeaderAndIsrUpdateResult) => Unit) = {
+    // We need to chain up the callbacks if there are multiple callbacks to be 
fired for the same partition.
+    // This is used by the ReplicaStateMachine where multiple replicas of the 
same partition can change in the same
+    // batch.
+    val onSuccessCallbacks = {
+      if (leaderAndIsrUpdates.contains(topicAndPartition))
+        leaderAndIsrUpdates(topicAndPartition).onSuccessCallbacks :+ onSuccess
+      else
+        List(onSuccess)
+    }
+    leaderAndIsrUpdates += (topicAndPartition -> new 
ZkLeaderAndIsrUpdate(newLeaderAndIsr, expectZkVersion, onSuccessCallbacks))
+  }
+
+  def completeLeaderAndIsrUpdate(topicAndPartition: TopicAndPartition) = {
+    leaderAndIsrUpdates -= topicAndPartition
+  }
+
+  def incompleteUpdates = this synchronized(leaderAndIsrUpdates.size)
+
+  def containsPartition(topicAndPartition: TopicAndPartition): Boolean =
+    leaderAndIsrUpdates.contains(topicAndPartition)
+
+  def pendingLeaderAndIsrUpdate(topicAndPartition: TopicAndPartition) = {
+    leaderAndIsrUpdates(topicAndPartition)
+  }
+
+  def writeLeaderAndIsrUpdateToZk(controllerEpoch: Int, 
preConditionCheckerOpt: Option[() => Boolean] = None) =
+    ReplicationUtils.asyncUpdateLeaderAndIsr(zkUtils, this, controllerEpoch, 
preConditionCheckerOpt)
+}
+
+/**
+ * This is a container class to host the LeaderAndIsr updates in zookeeper.
+ * @param newLeaderAndIsr The new LeaderAndIsr information.
+ * @param expectZkVersion The expected zkVersion of the path
+ * @param onSuccessCallbacks The callbacks to fire when the LeaderAndIsr 
update succeeded. We need a list because there
+ *                           might be multiple replica state changes for the 
same partition in one batch.
+ */
+class ZkLeaderAndIsrUpdate(val newLeaderAndIsr: LeaderAndIsr,
+                           val expectZkVersion: Int,
+                           val onSuccessCallbacks: List[(TopicAndPartition, 
ZkLeaderAndIsrUpdateResult) => Unit])
+
+case class ZkLeaderAndIsrUpdateResult(val leaderAndIsr: LeaderAndIsr,
+                                      val newZkVersion: Int)
+
+case class ZkLeaderAndIsrReadResult(val leaderIsrAndControllerEpochOpt: 
Option[LeaderIsrAndControllerEpoch],
+                                    val exceptionOpt: Option[Exception]) {
+  override def toString: String = "[" + leaderIsrAndControllerEpochOpt + "," + 
exceptionOpt + "]"
+}
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala 
b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 31e8a92cf0c..e97ccbb72fb 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -17,10 +17,13 @@
 
 package kafka.utils
 
+import java.util.concurrent.CountDownLatch
+
 import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
-import kafka.controller.{IsrChangeNotificationListener, 
LeaderIsrAndControllerEpoch}
+import kafka.controller.{ZkLeaderAndIsrUpdate, ZkLeaderAndIsrReadResult, 
ZkLeaderAndIsrUpdateResult, ZkLeaderAndIsrUpdateBatch, 
IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
 import kafka.utils.ZkUtils._
+import org.apache.zookeeper.ZooKeeper
 import org.apache.zookeeper.data.Stat
 
 import scala.collection._
@@ -39,6 +42,58 @@ object ReplicationUtils extends Logging {
     updatePersistentPath
   }
 
+  /**
+   * This method asynchronously updates the leader and ISR information in a 
LeaderAndIsrUpdateBatch. It will block waiting
+   * until all the updates finish.
+   *
+   * The method takes an optional precondition check method and makes sure the 
precondition is met before updating the
+   * leader and ISR. This is useful for the controller. When the controller 
updates leader and ISR in zookeeper, it is
+   * important that the controller remains the controller during the entire 
update process. To guarantee that, this
+   * method first take the current ZooKeeper session, and then check if the 
current broker is the controller. If it
+   * is valid then this method will use the current ZK session for all the 
updates.
+   *
+   * During the leader and ISR update, if the zookeeper connection is lost or 
ZooKeeper session expired, the current
+   * ZooKeeper session will throw exception. We propagate the exception to 
caller and let the caller to decide
+   * whether to continue or abort.
+   *
+   * Unlike the synchronous leader and isr update call which takes an optional 
checker function, the async leader and
+   * ISR update call does not verify if the leader and isr data is the same or 
not. The update will fail even if the
+   * data are the same but the zk version mismatch. We do this because we 
cannot read data in the zookeeper event
+   * thread.
+   */
+  def asyncUpdateLeaderAndIsr(zkUtils: ZkUtils,
+                              leaderAndIsrUpdateBatch: 
ZkLeaderAndIsrUpdateBatch,
+                              controllerEpoch: Int,
+                              preconditionCheckerOpt: Option[() => Boolean] = 
None) = {
+    val unprocessedUpdates = new 
CountDownLatch(leaderAndIsrUpdateBatch.incompleteUpdates)
+    zkUtils.withCurrentSession(preconditionCheckerOpt) { zk =>
+      leaderAndIsrUpdateBatch.leaderAndIsrUpdates.foreach { case (tp, update) 
=> {
+        val path = getTopicPartitionLeaderAndIsrPath(tp.topic, tp.partition)
+        val newLeaderAndIsr = update.newLeaderAndIsr
+        val newLeaderData = zkUtils.leaderAndIsrZkData(newLeaderAndIsr, 
controllerEpoch)
+        zkUtils.asyncConditionalUpdatePersistentPath(path, newLeaderData, 
update.expectZkVersion,
+          (updateSucceeded, updatedZkVersion) => {
+            // Remove the successfully updated partitions
+            // We do not handle failure and retry to make Zookeeper 
EventThread light weighted. The caller is
+            // responsible to check if the update batch is finished or retry 
is needed.
+            try {
+              trace(s"Received LeaderAndIsr update callback of update 
$newLeaderAndIsr for $tp. " +
+                s"UpdateSucceeded = $updateSucceeded")
+              if (updateSucceeded) {
+                leaderAndIsrUpdateBatch.completeLeaderAndIsrUpdate(tp)
+                update.onSuccessCallbacks.foreach { case onSuccess =>
+                  onSuccess(tp, new 
ZkLeaderAndIsrUpdateResult(newLeaderAndIsr, updatedZkVersion))
+                }
+              }
+            } finally {
+              unprocessedUpdates.countDown()
+            }
+          }, Some(zk))
+      }}
+    }
+    unprocessedUpdates.await()
+  }
+
   def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet: 
Set[TopicAndPartition]): Unit = {
     val isrChangeNotificationPath: String = 
zkUtils.createSequentialPersistentPath(
       ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
@@ -75,6 +130,34 @@ object ReplicationUtils extends Logging {
     leaderAndIsrOpt.flatMap(leaderAndIsrStr => 
parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
   }
 
+  def asyncGetLeaderIsrAndEpochForPartitions(zkUtils: ZkUtils,
+                                             partitions: 
Set[TopicAndPartition]): Map[TopicAndPartition, ZkLeaderAndIsrReadResult] = {
+    val unprocessedReads = new CountDownLatch(partitions.size)
+    val zkLeaderAndIsrReadResults = new mutable.HashMap[TopicAndPartition, 
ZkLeaderAndIsrReadResult]
+    partitions.foreach { case tap =>
+      val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(tap.topic, 
tap.partition)
+      zkUtils.asyncReadDataMaybeNull(leaderAndIsrPath, new ZkReadCallback {
+        override def handle(dataOpt: Option[String], stat: Stat, exceptionOpt: 
Option[Exception]): Unit = {
+          try {
+            zkLeaderAndIsrReadResults += tap -> {
+                if (exceptionOpt.isEmpty) {
+                  val leaderIsrControllerEpochOpt =
+                    dataOpt.flatMap(leaderAndIsrStr => 
parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
+                  new ZkLeaderAndIsrReadResult(leaderIsrControllerEpochOpt, 
None)
+                }
+                else
+                  new ZkLeaderAndIsrReadResult(None, exceptionOpt)
+              }
+          } finally {
+            unprocessedReads.countDown()
+          }
+        }
+      })
+    }
+    unprocessedReads.await()
+    zkLeaderAndIsrReadResults
+  }
+
   private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: 
Stat)
       : Option[LeaderIsrAndControllerEpoch] = {
     Json.parseFull(leaderAndIsrStr).flatMap {m =>
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 49d3cfaaf8b..796aaba7b95 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -17,7 +17,7 @@
 
 package kafka.utils
 
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{Callable, CountDownLatch}
 
 import kafka.admin._
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0, LeaderAndIsr}
@@ -29,10 +29,10 @@ import kafka.server.ConfigType
 import kafka.utils.ZkUtils._
 import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, 
ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
-import org.I0Itec.zkclient.{ZkClient, ZkConnection}
+import org.I0Itec.zkclient.{IZkConnection, ZkConnection}
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
+import org.apache.zookeeper.AsyncCallback.{DataCallback, StatCallback, 
StringCallback}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper}
@@ -465,6 +465,28 @@ class ZkUtils(val zkClient: ZkClient,
     }
   }
 
+  /**
+   * Asynchronnously make conditional update on a persistent path. Unlike the 
synchronous update call, this method does
+   * not take an optional checker function. It is the caller's responsibility 
to refresh the data in ZK and decide
+   * whether to continue or abandon the update.
+   * @param path The persistent path to update
+   * @param data The data to write
+   * @param expectVersion The expected zkVersion
+   * @param resultHandler A method that takes (updateSucceeded, newZkVersion) 
as argument. This method will be fired
+   *                      as a callback when the update finishes.
+   * @param zkOpt the zookeeper session to use for this update.
+   */
+  def asyncConditionalUpdatePersistentPath(path: String,
+                                           data: String,
+                                           expectVersion: Int,
+                                           resultHandler: (Boolean, Int) => 
Unit,
+                                           zkOpt: Option[ZooKeeper] = None) = {
+    val callbackWithData = new ZkWriteCallbackWithData(data, expectVersion) {
+      override def handle(updateSucceeded: Boolean, newZkVersion: Int) = 
resultHandler(updateSucceeded, newZkVersion)
+    }
+    zkClient.asyncWriteDataAndReturnStat(path, data, callbackWithData, 
expectVersion, zkOpt)
+  }
+
   /**
    * Conditional update the persistent path data, return (true, newVersion) if 
it succeeds, otherwise (the current
    * version is not the expected version, etc.) return (false, -1). If path 
doesn't exist, throws ZkNoNodeException
@@ -554,6 +576,10 @@ class ZkUtils(val zkClient: ZkClient,
     dataAndStat
   }
 
+  def asyncReadDataMaybeNull(path: String, callback: DataCallback) = {
+    zkClient.asyncReadData(path, callback)
+  }
+
   def getChildren(path: String): Seq[String] = {
     import scala.collection.JavaConversions._
     // triggers implicit conversion from java list to scala Seq
@@ -861,6 +887,13 @@ class ZkUtils(val zkClient: ZkClient,
     }
   }
 
+  def withCurrentSession(preconditionChecker: Option[() => Boolean] = None)(f: 
ZooKeeper => Unit) {
+    val currentZk = zkClient.currentZk
+    if (!preconditionChecker.fold(true)(f => f()))
+      throw new IllegalStateException("The pre-check failed.")
+    f(currentZk)
+  }
+
   def close() {
     if(zkClient != null) {
       zkClient.close()
@@ -1112,3 +1145,67 @@ class ZKCheckedEphemeral(path: String,
     }
   }
 }
+
+private[kafka] class ZkClient(zkConnection: IZkConnection, connectionTimeout: 
Int, zkSerializer: ZkSerializer)
+  extends org.I0Itec.zkclient.ZkClient(zkConnection, connectionTimeout, 
zkSerializer) {
+
+  def this(zkServers: String, sessionTimeout: Int, connectionTimeout: Int, 
zkSerializer: ZkSerializer) {
+    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, 
zkSerializer)
+  }
+
+  def asyncWriteDataAndReturnStat(path: String,
+                                  datat: String,
+                                  callbackWithData: StatCallback,
+                                  expectedVersion: Int,
+                                  zkOpt: Option[ZooKeeper] = None) = {
+    val data: Array[Byte] = ZKStringSerializer.serialize(datat)
+    var zk = zkOpt.getOrElse(currentZk)
+    zk.setData(path, data, expectedVersion, callbackWithData, null)
+  }
+  
+  def asyncReadData(path: String, callback: DataCallback) {
+    retryUntilConnected(new Callable[Object] {
+      def call: Object = {
+        currentZk.getData(path, false, callback, null)
+        null
+      }
+    })
+  }
+
+  def currentZk = _connection.asInstanceOf[ZkConnection].getZookeeper
+}
+
+private[kafka] abstract class ZkReadCallback extends DataCallback {
+  override def processResult(rc: Int, path: String, ctx: Object, data: 
Array[Byte], stat: Stat) {
+    val code = Code.get(rc)
+    val datat = code match {
+      case Code.OK => 
Some(ZKStringSerializer.deserialize(data).asInstanceOf[String])
+      case _ => None
+    }
+    // We mimic the sync zkClientBehavior here to return an exception.
+    val exceptionOpt = if (code != Code.OK && code != Code.NONODE) 
Some(KeeperException.create(Code.get(rc))) else None
+    handle(datat, stat, exceptionOpt)
+  }
+
+  def handle(dataOpt: Option[String], stat: Stat, exceptionOpt: 
Option[Exception])
+}
+
+private[kafka] abstract class ZkWriteCallbackWithData(val data: String,
+                                                      val expectVersion: Int)
+  extends StatCallback with Logging {
+
+  override def processResult(rc: Int, path: String, ctx: Object, stat: Stat) {
+    val code = Code.get(rc)
+    trace(s"Received return code $code for update of zk path $path")
+    val (updateSucceeded, newZkVersion) = code match {
+      case Code.OK => (true, stat.getVersion)
+      case _ =>
+        warn("Conditional update of path %s with data %s and expected version 
%d failed due to %s".format(path, data,
+          expectVersion, KeeperException.create(code).getMessage))
+        (false, -1)
+    }
+    handle(updateSucceeded, newZkVersion)
+  }
+
+  def handle(updateSucceeded: Boolean, newZkVersion: Int)
+}
diff --git 
a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala 
b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 818229c2510..816c7188b70 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -18,9 +18,8 @@
 package kafka.consumer
 
 import org.easymock.EasyMock
-import org.I0Itec.zkclient.ZkClient
 import org.apache.zookeeper.data.Stat
-import kafka.utils.{TestUtils, Logging, ZkUtils, Json}
+import kafka.utils.{ZkClient, TestUtils, Logging, ZkUtils, Json}
 import org.junit.Assert._
 import kafka.common.TopicAndPartition
 import kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo
diff --git 
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index b725d8b59ee..8ffad86692d 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -251,6 +251,7 @@ class UncleanLeaderElectionTest extends 
ZooKeeperTestHarness {
 
     produceMessage(servers, topic, "third")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
+    Thread.sleep(1000)
     servers.filter(server => server.config.brokerId == leaderId).map(server => 
shutdownServer(server))
 
     // verify clean leader transition to ISR follower
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a5a8df1e32a..83a999342f4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -25,8 +25,7 @@ import kafka.api.{FetchResponsePartitionData, 
PartitionFetchInfo}
 import kafka.cluster.Broker
 import kafka.common.TopicAndPartition
 import kafka.message.{ByteBufferMessageSet, Message}
-import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
-import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZkClient, MockScheduler, MockTime, TestUtils, ZkUtils}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.LeaderAndIsrRequest
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala 
b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index d4ddb2f7ced..4b7fb2acd71 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.utils
 
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{ZkLeaderAndIsrUpdateBatch, 
LeaderIsrAndControllerEpoch}
 import kafka.server.{ReplicaFetcherManager, KafkaConfig}
 import kafka.api.LeaderAndIsr
 import kafka.zk.ZooKeeperTestHarness
@@ -94,6 +94,57 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
       "my-topic-test", partitionId, newLeaderAndIsr3, controllerEpoch, 
zkVersion + 1)
     assertFalse(updateSucceeded3)
     assertEquals(newZkVersion3,-1)
+
+    //test async update leader and isr
+    val topicAndPartition = TopicAndPartition("my-topic-test", partitionId)
+    val leaderAndIsrUpdateBatch = new ZkLeaderAndIsrUpdateBatch(zkUtils)
+
+    // Test regular update
+    var updateSucceeded4 = false
+    var newZkVersion4 = -1
+    val newLeaderAndIsr4 = new LeaderAndIsr(brokerId, leaderEpoch + 1, 
replicas, zkVersion = 2)
+    leaderAndIsrUpdateBatch.addLeaderAndIsrUpdate(topicAndPartition, 
newLeaderAndIsr4, expectZkVersion = 1,
+      (_, updateResult) => {
+        updateSucceeded4 = true
+        newZkVersion4 = updateResult.newZkVersion
+      }
+    )
+    leaderAndIsrUpdateBatch.writeLeaderAndIsrUpdateToZk(controllerEpoch)
+    assertTrue(updateSucceeded4)
+    assertEquals(2, newZkVersion4)
+    assertEquals(0, leaderAndIsrUpdateBatch.incompleteUpdates)
+
+    // Test mismatched zkversion with same data
+    var updateSucceeded5 = false
+    var newZkVersion5 = -1
+    val newLeaderAndIsr5 = new LeaderAndIsr(brokerId, leaderEpoch + 1, 
replicas, zkVersion = 3)
+    leaderAndIsrUpdateBatch.addLeaderAndIsrUpdate(topicAndPartition, 
newLeaderAndIsr5, expectZkVersion = 3,
+      (_, updateResult) => {
+        updateSucceeded5 = true
+        newZkVersion5 = updateResult.newZkVersion
+      }
+    )
+    leaderAndIsrUpdateBatch.writeLeaderAndIsrUpdateToZk(controllerEpoch)
+    assertFalse(updateSucceeded5)
+    assertEquals(-1, newZkVersion5)
+    assertEquals(1, leaderAndIsrUpdateBatch.incompleteUpdates)
+    assertTrue(leaderAndIsrUpdateBatch.containsPartition(topicAndPartition))
+
+    // Test mismatched zkversion with different data
+    var updateSucceeded6 = false
+    var newZkVersion6 = -1
+    val newLeaderAndIsr6 = new LeaderAndIsr(brokerId, leaderEpoch + 2, 
replicas, zkVersion = 3)
+    leaderAndIsrUpdateBatch.addLeaderAndIsrUpdate(topicAndPartition, 
newLeaderAndIsr1, expectZkVersion = 3,
+      (_, updateResult) => {
+        updateSucceeded6 = true
+        newZkVersion6 = updateResult.newZkVersion
+      }
+    )
+    leaderAndIsrUpdateBatch.writeLeaderAndIsrUpdateToZk(controllerEpoch)
+    assertFalse(updateSucceeded6)
+    assertEquals(-1, newZkVersion6)
+    assertEquals(1, leaderAndIsrUpdateBatch.incompleteUpdates)
+    assertTrue(leaderAndIsrUpdateBatch.containsPartition(topicAndPartition))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/zk/ZkClientAsyncOperationTest.scala 
b/core/src/test/scala/unit/kafka/zk/ZkClientAsyncOperationTest.scala
new file mode 100644
index 00000000000..660ca463b5d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/ZkClientAsyncOperationTest.scala
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk
+
+import kafka.utils.{ZkReadCallback, TestUtils}
+import org.apache.zookeeper.data.Stat
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+
+class ZkClientAsyncOperationTest extends ZooKeeperTestHarness {
+
+  @Test
+  def testAsyncWrite() {
+    val numPaths = 1000
+    val paths = (0 until numPaths).map { i => {
+      val path = "/testPath" + i
+      zkUtils.createPersistentPath(path, i.toString)
+      val (dataOpt, stat) = zkUtils.readDataMaybeNull(path)
+      assertEquals(i.toString, dataOpt.get)
+      val zkVersion = stat.getVersion
+      assertEquals(0, zkVersion)
+      path
+    }}
+
+    var callbackFired = 0
+    var errorInCallback = false
+    def handleResult(updateSucceeded: Boolean, newZkVersion: Int) = {
+      if(!updateSucceeded)
+        errorInCallback = true
+      if (newZkVersion != 1)
+        errorInCallback = true
+      callbackFired += 1
+    }
+    paths.foreach { path =>
+      zkUtils.asyncConditionalUpdatePersistentPath(path, "-1", 0, handleResult)
+    }
+    TestUtils.waitUntilTrue(() => callbackFired == numPaths, "Callback did not 
fire before timeout.")
+    assertFalse(errorInCallback)
+    paths.foreach { path =>
+      val (newData, newStat) = zkUtils.readDataMaybeNull(path)
+      assertEquals("-1", newData.get)
+      assertEquals(1, newStat.getVersion)
+    }
+  }
+
+  @Test
+  def testAsyncRead() {
+    val numPaths = 1000
+    val paths = (0 until numPaths).map { i => {
+      val path = "/testPath" + i
+      zkUtils.createPersistentPath(path, i.toString)
+      val (dataOpt, stat) = zkUtils.readDataMaybeNull(path)
+      assertEquals(i.toString, dataOpt.get)
+      val zkVersion = stat.getVersion
+      assertEquals(0, zkVersion)
+      path
+    }}
+
+    var writeCallbackFired = 0
+    def handleWriteResult(updateSucceeded: Boolean, newZkVersion: Int) = {
+      assertTrue(updateSucceeded)
+      assertEquals(1, newZkVersion)
+      writeCallbackFired += 1
+    }
+    paths.foreach { path =>
+      zkUtils.asyncConditionalUpdatePersistentPath(path, path + "data", 0, 
handleWriteResult)
+    }
+    TestUtils.waitUntilTrue(() => writeCallbackFired == numPaths, "Callback 
did not fire before timeout.")
+
+    var readCallbackFired = 0
+    var errorInCallback = false
+    paths.foreach { path => {
+      zkUtils.asyncReadDataMaybeNull(path, new ZkReadCallback() {
+        override def handle(dataOpt: Option[String], stat: Stat, exceptionOpt: 
Option[Exception]) = {
+          if (!exceptionOpt.isEmpty)
+            errorInCallback = true
+          if (path + "data" != dataOpt.get)
+            errorInCallback = true
+          if (stat.getVersion != 1)
+            errorInCallback = true
+          readCallbackFired += 1
+        }
+      })
+    }}
+    TestUtils.waitUntilTrue(() => readCallbackFired == numPaths, "Callback did 
not fire before timeout.")
+    assertFalse(errorInCallback)
+  }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Speed up controlled shutdown.
> -----------------------------
>
>                 Key: KAFKA-3436
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3436
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.9.0.0
>            Reporter: Jiangjie Qin
>            Assignee: Jiangjie Qin
>
> Currently rolling bounce a Kafka cluster with tens of thousands of partitions 
> can take very long (~2 min for each broker with ~5000 partitions/broker in 
> our environment). The majority of the time is spent on shutting down a 
> broker. The time of shutting down a broker usually  includes the following 
> parts:
> T1: During controlled shutdown, people usually want to make sure there is no 
> under replicated partitions. So shutting down a broker during a rolling 
> bounce will have to wait for the previous restarted broker to catch up. This 
> is T1.
> T2: The time to send controlled shutdown request and receive controlled 
> shutdown response. Currently the a controlled shutdown request will trigger 
> many LeaderAndIsrRequest and UpdateMetadataRequest. And also involving many 
> zookeeper update in serial.
> T3: The actual time to shutdown all the components. It is usually small 
> compared with T1 and T2.
> T1 is related to:
> A) the inbound throughput on the cluster, and 
> B) the "down" time of the broker (time between replica fetchers stop and 
> replica fetchers restart)
> The larger the traffic is, or the longer the broker stopped fetching, the 
> longer it will take for the broker to catch up and get back into ISR. 
> Therefore the longer T1 will be. Assume:
> * the in bound network traffic is X bytes/second on a broker
> * the time T1.B ("down" time) mentioned above is T
> Theoretically it will take (X * T) / (NetworkBandwidth - X) = 
> InBoundNetworkUtilization * T / (1 - InboundNetworkUtilization) for a the 
> broker to catch up after the restart. While X is out of our control, T is 
> largely related to T2.
> The purpose of this ticket is to reduce T2 by:
> 1. Batching the LeaderAndIsrRequest and UpdateMetadataRequest during 
> controlled shutdown.
> 2. Use async zookeeper write to pipeline zookeeper writes. According to 
> Zookeeper wiki(https://wiki.apache.org/hadoop/ZooKeeper/Performance), a 3 
> node ZK cluster should be able to handle 20K writes (1K size). So if we use 
> async write, likely we will be able to reduce zookeeper update time to lower 
> seconds or even sub-second level.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to