[
https://issues.apache.org/jira/browse/KAFKA-3436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16300831#comment-16300831
]
ASF GitHub Bot commented on KAFKA-3436:
---------------------------------------
ijuma closed pull request #1149: KAFKA-3436: Speed up controlled shutdown.
URL: https://github.com/apache/kafka/pull/1149
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 103f6cf575d..41e1d9d1dc0 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -17,32 +17,29 @@
package kafka.controller
import java.util
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
-import org.apache.kafka.common.errors.{BrokerNotAvailableException,
ControllerMovedException}
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.{AbstractRequest,
AbstractRequestResponse}
-
-import scala.collection._
import com.yammer.metrics.core.Gauge
-import java.util.concurrent.TimeUnit
-import kafka.admin.AdminUtils
import kafka.admin.PreferredReplicaLeaderElectionCommand
import kafka.api._
import kafka.cluster.Broker
import kafka.common._
-import kafka.log.LogConfig
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
+import kafka.server._
+import kafka.utils.CoreUtils._
import kafka.utils.ZkUtils._
import kafka.utils._
-import kafka.utils.CoreUtils._
-import org.apache.zookeeper.Watcher.Event.KeeperState
+import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener,
IZkStateListener, ZkClient}
+import org.apache.kafka.common.errors.{BrokerNotAvailableException,
ControllerMovedException}
import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{AbstractRequest,
AbstractRequestResponse}
import org.apache.kafka.common.utils.Time
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener,
IZkStateListener, ZkClient, ZkConnection}
-import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
-import java.util.concurrent.locks.ReentrantLock
-import kafka.server._
-import kafka.common.TopicAndPartition
+import org.apache.zookeeper.Watcher.Event.KeeperState
+
+import scala.collection._
class ControllerContext(val zkUtils: ZkUtils,
val zkSessionTimeout: Int) {
@@ -222,6 +219,8 @@ class KafkaController(val config : KafkaConfig, zkUtils:
ZkUtils, val brokerStat
"id_%d-host_%s-port_%d".format(config.brokerId,
controllerListener.get.host, controllerListener.get.port)
}
+ def isValidController = zkUtils.getController == config.brokerId
+
/**
* On clean shutdown, the controller first determines the partitions that the
* shutting down broker leads, and moves leadership of those partitions to
another broker
@@ -254,43 +253,40 @@ class KafkaController(val config : KafkaConfig, zkUtils:
ZkUtils, val brokerStat
.map(topicAndPartition => (topicAndPartition,
controllerContext.partitionReplicaAssignment(topicAndPartition).size))
}
- allPartitionsAndReplicationFactorOnBroker.foreach {
- case(topicAndPartition, replicationFactor) =>
- // Move leadership serially to relinquish lock.
- inLock(controllerContext.controllerLock) {
+ val leadersOnBroker = new mutable.HashSet[TopicAndPartition]
+ val followersOnBroker = new mutable.HashSet[PartitionAndReplica]
+ inLock(controllerContext.controllerLock) {
+ allPartitionsAndReplicationFactorOnBroker.foreach {
+ case (topicAndPartition, replicationFactor) =>
controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach {
currLeaderIsrAndControllerEpoch =>
if (replicationFactor > 1) {
if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id)
{
- // If the broker leads the topic partition, transition the
leader and update isr. Updates zk and
- // notifies all affected brokers
-
partitionStateMachine.handleStateChanges(Set(topicAndPartition),
OnlinePartition,
- controlledShutdownPartitionLeaderSelector)
+ leadersOnBroker += topicAndPartition
} else {
- // Stop the replica first. The state change below initiates
ZK changes which should take some time
- // before which the stop replica request should be completed
(in most cases)
- try {
- brokerRequestBatch.newBatch()
-
brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id),
topicAndPartition.topic,
- topicAndPartition.partition, deletePartition = false)
- brokerRequestBatch.sendRequestsToBrokers(epoch)
- } catch {
- case e : IllegalStateException => {
- // Resign if the controller is in an illegal state
- error("Forcing the controller to resign")
- brokerRequestBatch.clear()
- controllerElector.resign()
-
- throw e
- }
- }
- // If the broker is a follower, updates the isr in ZK and
notifies the current leader
-
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
- topicAndPartition.partition, id)), OfflineReplica)
+ followersOnBroker +=
PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, id)
}
}
}
- }
+ }
+ }
+
+ // We group the change to 1000 partitions to avoid holding the
controller lock for too long.
+ leadersOnBroker.grouped(1000).foreach { leaderGroup =>
+ inLock(controllerContext.controllerLock) {
+ // If the broker leads the topic partitions, transition all the
leaders and update isr. Updates zk and
+ // notifies all affected brokers
+ partitionStateMachine.handleStateChanges(leaderGroup,
OnlinePartition,
+ controlledShutdownPartitionLeaderSelector)
+ }
}
+
+ followersOnBroker.grouped(1000).foreach { followerGroup =>
+ inLock(controllerContext.controllerLock) {
+ // If the broker is a follower of the partitions, updates the isr in
ZK and notifies the current leaders
+ replicaStateMachine.handleStateChanges(followerGroup, OfflineReplica)
+ }
+ }
+
def replicatedPartitionsBrokerLeads() =
inLock(controllerContext.controllerLock) {
trace("All leaders = " +
controllerContext.partitionLeadershipInfo.mkString(","))
controllerContext.partitionLeadershipInfo.filter {
@@ -821,6 +817,30 @@ class KafkaController(val config : KafkaConfig, zkUtils:
ZkUtils, val brokerStat
controllerContext.controllerChannelManager.startup()
}
+ /**
+ * The method reads leader and ISR info from zookeeper. It is used before
invoking state changes to make sure the
+ * controller is using the up-to-date information.
+ */
+ def refreshLeaderAndIsrFromZk(topicAndPartitions: Set[TopicAndPartition]):
Map[TopicAndPartition, ZkLeaderAndIsrReadResult] = {
+ val leaderAndIsrReadResults =
ReplicationUtils.asyncGetLeaderIsrAndEpochForPartitions(zkUtils,
topicAndPartitions)
+ leaderAndIsrReadResults.foreach { case (tap, leaderAndIsrReadResult) =>
+ leaderAndIsrReadResult.leaderIsrAndControllerEpochOpt match {
+ case Some(leaderIsrAndControllerEpoch) =>
+ if (leaderIsrAndControllerEpoch.controllerEpoch > epoch)
+ throw new IllegalStateException(s"Leader and isr path written by
another controller. This probably" +
+ s"means the current controller with epoch $epoch went through a
soft failure and another " +
+ s"controller was elected with epoch
${leaderIsrAndControllerEpoch.controllerEpoch}.")
+ case None =>
+ }
+ leaderAndIsrReadResult.exceptionOpt match {
+ case Some(exception) =>
+ throw new KafkaException(s"Asychronously reading LeaderAndIsr
information for $tap", exception)
+ case None =>
+ }
+ }
+ leaderAndIsrReadResults
+ }
+
def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] =
controllerContext.partitionReplicaAssignment.keySet) {
val leaderAndIsrInfo =
zkUtils.getPartitionLeaderAndIsrForTopics(zkUtils.zkClient, topicAndPartitions)
for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
@@ -1040,73 +1060,6 @@ class KafkaController(val config : KafkaConfig, zkUtils:
ZkUtils, val brokerStat
}
}
- /**
- * Removes a given partition replica from the ISR; if it is not the current
- * leader and there are sufficient remaining replicas in ISR.
- * @param topic topic
- * @param partition partition
- * @param replicaId replica Id
- * @return the new leaderAndIsr (with the replica removed if it was present),
- * or None if leaderAndIsr is empty.
- */
- def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int):
Option[LeaderIsrAndControllerEpoch] = {
- val topicAndPartition = TopicAndPartition(topic, partition)
- debug("Removing replica %d from ISR %s for partition %s.".format(replicaId,
-
controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.isr.mkString(","),
topicAndPartition))
- var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch]
= None
- var zkWriteCompleteOrUnnecessary = false
- while (!zkWriteCompleteOrUnnecessary) {
- // refresh leader and isr from zookeeper again
- val leaderIsrAndEpochOpt =
ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
- zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
- case Some(leaderIsrAndEpoch) => // increment the leader epoch even if
the ISR changes
- val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
- val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
- if(controllerEpoch > epoch)
- throw new StateChangeFailedException("Leader and isr path written
by another controller. This probably" +
- "means the current controller with epoch %d went through a soft
failure and another ".format(epoch) +
- "controller was elected with epoch %d. Aborting state change by
this controller".format(controllerEpoch))
- if (leaderAndIsr.isr.contains(replicaId)) {
- // if the replica to be removed from the ISR is also the leader,
set the new leader value to -1
- val newLeader = if (replicaId == leaderAndIsr.leader)
LeaderAndIsr.NoLeader else leaderAndIsr.leader
- var newIsr = leaderAndIsr.isr.filter(b => b != replicaId)
-
- // if the replica to be removed from the ISR is the last surviving
member of the ISR and unclean leader election
- // is disallowed for the corresponding topic, then we must
preserve the ISR membership so that the replica can
- // eventually be restored as the leader.
- if (newIsr.isEmpty && !LogConfig.fromProps(config.originals,
AdminUtils.fetchEntityConfig(zkUtils,
- ConfigType.Topic,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
- info("Retaining last ISR %d of partition %s since unclean leader
election is disabled".format(replicaId, topicAndPartition))
- newIsr = leaderAndIsr.isr
- }
-
- val newLeaderAndIsr = new LeaderAndIsr(newLeader,
leaderAndIsr.leaderEpoch + 1,
- newIsr, leaderAndIsr.zkVersion + 1)
- // update the new leadership decision in zookeeper or retry
- val (updateSucceeded, newVersion) =
ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
- newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
-
- newLeaderAndIsr.zkVersion = newVersion
- finalLeaderIsrAndControllerEpoch =
Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
- controllerContext.partitionLeadershipInfo.put(topicAndPartition,
finalLeaderIsrAndControllerEpoch.get)
- if (updateSucceeded)
- info("New leader and ISR for partition %s is
%s".format(topicAndPartition, newLeaderAndIsr.toString()))
- updateSucceeded
- } else {
- warn("Cannot remove replica %d from ISR of partition %s since it
is not in the ISR. Leader = %d ; ISR = %s"
- .format(replicaId, topicAndPartition, leaderAndIsr.leader,
leaderAndIsr.isr))
- finalLeaderIsrAndControllerEpoch =
Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
- controllerContext.partitionLeadershipInfo.put(topicAndPartition,
finalLeaderIsrAndControllerEpoch.get)
- true
- }
- case None =>
- warn("Cannot remove replica %d from ISR of %s - leaderAndIsr is
empty.".format(replicaId, topicAndPartition))
- true
- }
- }
- finalLeaderIsrAndControllerEpoch
- }
-
/**
* Does not change leader or isr, but just increments the leader epoch
* @param topic topic
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ec03b84395b..180e58609f4 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -21,7 +21,7 @@ import collection.JavaConversions
import collection.mutable.Buffer
import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr
-import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition,
StateChangeFailedException, NoReplicaOnlineException}
+import kafka.common.{KafkaException, LeaderElectionNotNeededException,
TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
import kafka.utils.{Logging, ReplicationUtils}
import kafka.utils.ZkUtils._
import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
@@ -47,6 +47,7 @@ class PartitionStateMachine(controller: KafkaController)
extends Logging {
private val zkUtils = controllerContext.zkUtils
private val partitionState: mutable.Map[TopicAndPartition, PartitionState] =
mutable.Map.empty
private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
+ private val leaderAndIsrUpdateBatch = new ZkLeaderAndIsrUpdateBatch(zkUtils)
private val hasStarted = new AtomicBoolean(false)
private val noOpPartitionLeaderSelector = new
NoOpLeaderSelector(controllerContext)
private val topicChangeListener = new TopicChangeListener()
@@ -110,21 +111,12 @@ class PartitionStateMachine(controller: KafkaController)
extends Logging {
* state. This is called on a successful controller election and on broker
changes
*/
def triggerOnlinePartitionStateChange() {
- try {
- brokerRequestBatch.newBatch()
- // try to move all partitions in NewPartition or OfflinePartition state
to OnlinePartition state except partitions
- // that belong to topics to be deleted
- for((topicAndPartition, partitionState) <- partitionState
-
if(!controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)))
{
- if(partitionState.equals(OfflinePartition) ||
partitionState.equals(NewPartition))
- handleStateChange(topicAndPartition.topic,
topicAndPartition.partition, OnlinePartition,
controller.offlinePartitionSelector,
- (new CallbackBuilder).build)
- }
- brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
- } catch {
- case e: Throwable => error("Error while moving some partitions to the
online state", e)
- // TODO: It is not enough to bail out and log an error, it is important
to trigger leader election for those partitions
- }
+ val partitions = partitionState.filter { case (topicAndPartition,
partitionState) =>
+ val isBeingDeleted =
controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)
+ !isBeingDeleted && (partitionState.equals(OfflinePartition) ||
partitionState.equals(NewPartition))
+ }.keySet
+
+ handleStateChanges(partitions, OnlinePartition,
controller.offlinePartitionSelector, (new CallbackBuilder).build)
}
def partitionsInState(state: PartitionState): Set[TopicAndPartition] = {
@@ -141,11 +133,37 @@ class PartitionStateMachine(controller: KafkaController)
extends Logging {
callbacks: Callbacks = (new CallbackBuilder).build) {
info("Invoking state change to %s for partitions %s".format(targetState,
partitions.mkString(",")))
try {
- brokerRequestBatch.newBatch()
- partitions.foreach { topicAndPartition =>
- handleStateChange(topicAndPartition.topic,
topicAndPartition.partition, targetState, leaderSelector, callbacks)
- }
- brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
+ val remainingParititions = collection.mutable.Set(partitions.toArray:_*)
+ do {
+ // First refresh all the LeaderAndIsr info from zookeeper
+ // Make sure all the LeaderAndIsr information are successfully read
and this is the valid controller.
+ val leaderAndIsrReadResults =
controller.refreshLeaderAndIsrFromZk(remainingParititions)
+ trace(s"Refreshed LeaderAndIsr information for partitions:
${leaderAndIsrReadResults}")
+ leaderAndIsrUpdateBatch.newBatch()
+ brokerRequestBatch.newBatch()
+ partitions.foreach { topicAndPartition =>
+ val leaderIsrAndControllerEpoch =
leaderAndIsrReadResults(topicAndPartition).leaderIsrAndControllerEpochOpt
+ handleStateChange(topicAndPartition.topic,
topicAndPartition.partition, leaderIsrAndControllerEpoch,
+ targetState, leaderSelector, callbacks)
+ }
+ // Make sure we update zk first because the callbacks will fill in the
BrokerRequestBatch
+ try {
+
leaderAndIsrUpdateBatch.writeLeaderAndIsrUpdateToZk(controller.epoch, Some(()
=> controller.isValidController))
+ } catch {
+ // the exception thrown here is likely because of the zk
disconnection or session expiration. In this case
+ // we swallow the error and retry. For the LeaderAndIsr updates that
has been sent but not acked yet, they
+ // might be updated again, but that does not hurt.
+ case e: Throwable => error("Error while updating LeaderAndIsr in
zookeeper.", e)
+ }
+ brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
+
remainingParititions.retain(leaderAndIsrUpdateBatch.containsPartition(_))
+ if (!remainingParititions.isEmpty) {
+ debug(s"The following partitions are still waiting for state change:
$remainingParititions.")
+ if (!controller.isValidController)
+ throw new StateChangeFailedException(s"Controller
${controller.config.brokerId} epoch ${controller.epoch}" +
+ s" is no longer the valid controller.")
+ }
+ } while (!remainingParititions.isEmpty)
}catch {
case e: Throwable => error("Error while moving some partitions to %s
state".format(targetState), e)
// TODO: It is not enough to bail out and log an error, it is important
to trigger state changes for those partitions
@@ -175,7 +193,10 @@ class PartitionStateMachine(controller: KafkaController)
extends Logging {
* @param partition The partition for which the state transition is invoked
* @param targetState The end state that the partition should be moved to
*/
- private def handleStateChange(topic: String, partition: Int, targetState:
PartitionState,
+ private def handleStateChange(topic: String,
+ partition: Int,
+ leaderIsrAndControllerEpoch:
Option[LeaderIsrAndControllerEpoch],
+ targetState: PartitionState,
leaderSelector: PartitionLeaderSelector,
callbacks: Callbacks) {
val topicAndPartition = TopicAndPartition(topic, partition)
@@ -201,16 +222,17 @@ class PartitionStateMachine(controller: KafkaController)
extends Logging {
case NewPartition =>
// initialize leader and isr path for new partition
initializeLeaderAndIsrForPartition(topicAndPartition)
+ partitionState.put(topicAndPartition, OnlinePartition)
+ val leader =
controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
+ stateChangeLogger.trace("Controller %d epoch %d changed
partition %s from %s to %s with leader %d"
+ .format(controllerId, controller.epoch, topicAndPartition,
currState, targetState, leader))
case OfflinePartition =>
- electLeaderForPartition(topic, partition, leaderSelector)
+ electLeaderForPartition(topic, partition, OfflinePartition,
leaderIsrAndControllerEpoch, leaderSelector)
case OnlinePartition => // invoked when the leader needs to be
re-elected
- electLeaderForPartition(topic, partition, leaderSelector)
+ electLeaderForPartition(topic, partition, OnlinePartition,
leaderIsrAndControllerEpoch, leaderSelector)
case _ => // should never come here since illegal previous states
are checked above
}
- partitionState.put(topicAndPartition, OnlinePartition)
- val leader =
controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
- stateChangeLogger.trace("Controller %d epoch %d changed partition %s
from %s to %s with leader %d"
- .format(controllerId, controller.epoch,
topicAndPartition, currState, targetState, leader))
+
// post: partition has a leader
case OfflinePartition =>
// pre: partition should be in New or Online state
@@ -316,49 +338,57 @@ class PartitionStateMachine(controller: KafkaController)
extends Logging {
/**
* Invoked on the OfflinePartition,OnlinePartition->OnlinePartition state
change.
* It invokes the leader election API to elect a leader for the input
offline partition
- * @param topic The topic of the offline partition
- * @param partition The offline partition
- * @param leaderSelector Specific leader selector (e.g.,
offline/reassigned/etc.)
+ * @param topic The topic of the offline
partition
+ * @param partition The offline partition
+ * @param currState The current state of the
partition.
+ * @param leaderIsrAndControllerEpochOpt The current leader and isr
information in zookeeper
+ * @param leaderSelector Specific leader selector
(e.g., offline/reassigned/etc.)
*/
- def electLeaderForPartition(topic: String, partition: Int, leaderSelector:
PartitionLeaderSelector) {
+ def electLeaderForPartition(topic: String,
+ partition: Int,
+ currState: PartitionState,
+ leaderIsrAndControllerEpochOpt:
Option[LeaderIsrAndControllerEpoch],
+ leaderSelector: PartitionLeaderSelector) {
val topicAndPartition = TopicAndPartition(topic, partition)
// handle leader election for the partitions whose leader is no longer
alive
stateChangeLogger.trace("Controller %d epoch %d started leader election
for partition %s"
.format(controllerId, controller.epoch,
topicAndPartition))
try {
- var zookeeperPathUpdateSucceeded: Boolean = false
- var newLeaderAndIsr: LeaderAndIsr = null
- var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
- while(!zookeeperPathUpdateSucceeded) {
- val currentLeaderIsrAndEpoch =
getLeaderIsrAndEpochOrThrowException(topic, partition)
- val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
- val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
- if (controllerEpoch > controller.epoch) {
- val failMsg = ("aborted leader election for partition [%s,%d] since
the LeaderAndIsr path was " +
- "already written by another controller. This probably
means that the current controller %d went through " +
- "a soft failure and another controller was elected
with epoch %d.")
- .format(topic, partition, controllerId,
controllerEpoch)
- stateChangeLogger.error("Controller %d epoch %d
".format(controllerId, controller.epoch) + failMsg)
- throw new StateChangeFailedException(failMsg)
- }
- // elect new leader or throw exception
- val (leaderAndIsr, replicas) =
leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
- val (updateSucceeded, newVersion) =
ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
- leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
- newLeaderAndIsr = leaderAndIsr
- newLeaderAndIsr.zkVersion = newVersion
- zookeeperPathUpdateSucceeded = updateSucceeded
- replicasForThisPartition = replicas
+
+ val currLeaderIsrAndEpoch = leaderIsrAndControllerEpochOpt.getOrElse({
+ val failMsg = "LeaderAndIsr information doesn't exist for partition %s
in %s state"
+ .format(topicAndPartition, partitionState(topicAndPartition))
+ throw new StateChangeFailedException(failMsg)
+ })
+ val currLeaderAndIsr = currLeaderIsrAndEpoch.leaderAndIsr
+ // elect new leader or throw exception
+ val (newLeaderAndIsr, replicas) =
leaderSelector.selectLeader(topicAndPartition, currLeaderAndIsr)
+ leaderAndIsrUpdateBatch.addLeaderAndIsrUpdate(topicAndPartition,
+ newLeaderAndIsr,
+ currLeaderAndIsr.zkVersion,
+
onLeaderAndIsrUpdateSuccess)
+
+ def onLeaderAndIsrUpdateSuccess(topicAndPartition: TopicAndPartition,
+ leaderAndIsrUpdateResult:
ZkLeaderAndIsrUpdateResult) = {
+ val newLeaderAndIsr = leaderAndIsrUpdateResult.leaderAndIsr
+ val newZkVersion = leaderAndIsrUpdateResult.newZkVersion
+ updateLocalState(newLeaderAndIsr, newZkVersion)
+ }
+
+ def updateLocalState(newLeaderAndIsr: LeaderAndIsr, newZkVersion: Int) =
{
+ val newLeaderIsrAndControllerEpoch = new
LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
+ // update the leader cache
+ controllerContext.partitionLeadershipInfo.put(topicAndPartition,
newLeaderIsrAndControllerEpoch)
+ debug(s"After leader election, leader cache is updated to " +
+ s"${controllerContext.partitionLeadershipInfo.map(l => (l._1,
l._2))}")
+ val assignedReplicas =
controllerContext.partitionReplicaAssignment(TopicAndPartition(topic,
partition))
+ // store new leader and isr info in cache
+ brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicas, topic,
partition,
+ newLeaderIsrAndControllerEpoch, assignedReplicas)
+ partitionState.put(topicAndPartition, OnlinePartition)
+ stateChangeLogger.trace(s"Controller $controllerId epoch
${controller.epoch} changed partition $topicAndPartition " +
+ s"from $currState to $OnlinePartition with leader
${newLeaderAndIsr.leader}")
}
- val newLeaderIsrAndControllerEpoch = new
LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
- // update the leader cache
- controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic,
partition), newLeaderIsrAndControllerEpoch)
- stateChangeLogger.trace("Controller %d epoch %d elected leader %d for
Offline partition %s"
- .format(controllerId, controller.epoch,
newLeaderAndIsr.leader, topicAndPartition))
- val replicas =
controllerContext.partitionReplicaAssignment(TopicAndPartition(topic,
partition))
- // store new leader and isr info in cache
-
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition,
topic, partition,
- newLeaderIsrAndControllerEpoch, replicas)
} catch {
case lenne: LeaderElectionNotNeededException => // swallow
case nroe: NoReplicaOnlineException => throw nroe
@@ -367,7 +397,6 @@ class PartitionStateMachine(controller: KafkaController)
extends Logging {
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId,
controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg, sce)
}
- debug("After leader election, leader cache is updated to
%s".format(controllerContext.partitionLeadershipInfo.map(l => (l._1, l._2))))
}
private def registerTopicChangeListener() = {
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 2fd8b95506c..04a04c8cf0f 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -16,13 +16,17 @@
*/
package kafka.controller
+import kafka.admin.AdminUtils
+import kafka.api.LeaderAndIsr
+import kafka.log.LogConfig
+import kafka.server.ConfigType
+
import collection._
import collection.JavaConversions._
import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{TopicAndPartition, StateChangeFailedException}
+import kafka.common.{KafkaException, TopicAndPartition,
StateChangeFailedException}
import kafka.utils.{ZkUtils, ReplicationUtils, Logging}
import org.I0Itec.zkclient.IZkChildListener
-import org.apache.log4j.Logger
import kafka.controller.Callbacks._
import kafka.utils.CoreUtils._
@@ -51,6 +55,7 @@ class ReplicaStateMachine(controller: KafkaController)
extends Logging {
private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] =
mutable.Map.empty
private val brokerChangeListener = new BrokerChangeListener()
private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
+ private val leaderAndIsrUpdateBatch = new ZkLeaderAndIsrUpdateBatch(zkUtils)
private val hasStarted = new AtomicBoolean(false)
private val stateChangeLogger = KafkaController.stateChangeLogger
@@ -110,10 +115,43 @@ class ReplicaStateMachine(controller: KafkaController)
extends Logging {
if(replicas.size > 0) {
info("Invoking state change to %s for replicas %s".format(targetState,
replicas.mkString(",")))
try {
- brokerRequestBatch.newBatch()
- replicas.foreach(r => handleStateChange(r, targetState, callbacks))
- brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
- }catch {
+
+ val remainingTopicAndPartitions = collection.mutable.Set(replicas.map
{ partitionAndReplica =>
+ new TopicAndPartition(partitionAndReplica.topic,
partitionAndReplica.partition)
+ }.toArray:_*)
+ val remainingReplicas = collection.mutable.Set(replicas.toArray:_*)
+ do {
+ // First refresh all the LeaderAndIsr info from zookeeper
+ // Make sure all the LeaderAndIsr information are successfully read
and this is the valid controller.
+ val leaderAndIsrReadResults =
controller.refreshLeaderAndIsrFromZk(remainingTopicAndPartitions)
+ trace(s"Refreshed LeaderAndIsr information for replicas:
${leaderAndIsrReadResults}")
+ leaderAndIsrUpdateBatch.newBatch()
+ brokerRequestBatch.newBatch()
+ remainingReplicas.foreach(r => {
+ val leaderIsrAndControllerEpochOpt =
+ leaderAndIsrReadResults(TopicAndPartition(r.topic,
r.partition)).leaderIsrAndControllerEpochOpt
+ handleStateChange(r, leaderIsrAndControllerEpochOpt, targetState,
callbacks)
+ })
+ try {
+
leaderAndIsrUpdateBatch.writeLeaderAndIsrUpdateToZk(controller.epoch, Some(()
=> controller.isValidController))
+ } catch {
+ // the exception thrown here is likely because of the zk
disconnection or session expiration. In this case
+ // we swallow the error and retry. For the LeaderAndIsr updates
that has been sent but not acked yet, they
+ // might be updated again, but that does not hurt.
+ case e: Throwable => error("Error while updating LeaderAndIsr in
zookeeper.", e)
+ }
+ brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
+
remainingTopicAndPartitions.retain(leaderAndIsrUpdateBatch.containsPartition(_))
+ remainingReplicas.retain(r =>
leaderAndIsrUpdateBatch.containsPartition(TopicAndPartition(r.topic,
r.partition)))
+ if (!remainingReplicas.isEmpty) {
+ debug(s"The following replicas are still waiting for state change:
$remainingReplicas.")
+ if (!controller.isValidController)
+ throw new StateChangeFailedException(s"Controller
${controller.config.brokerId} epoch ${controller.epoch}" +
+ s" is no longer the valid controller.")
+ }
+ } while (!remainingReplicas.isEmpty)
+
+ } catch {
case e: Throwable => error("Error while moving some replicas to %s
state".format(targetState), e)
}
}
@@ -154,7 +192,9 @@ class ReplicaStateMachine(controller: KafkaController)
extends Logging {
* @param partitionAndReplica The replica for which the state transition is
invoked
* @param targetState The end state that the replica should be moved to
*/
- def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState:
ReplicaState,
+ def handleStateChange(partitionAndReplica: PartitionAndReplica,
+ leaderIsrAndControllerEpochOpt:
Option[LeaderIsrAndControllerEpoch],
+ targetState: ReplicaState,
callbacks: Callbacks) {
val topic = partitionAndReplica.topic
val partition = partitionAndReplica.partition
@@ -171,7 +211,6 @@ class ReplicaStateMachine(controller: KafkaController)
extends Logging {
case NewReplica =>
assertValidPreviousStates(partitionAndReplica,
List(NonExistentReplica), targetState)
// start replica as a follower to the current leader for its
partition
- val leaderIsrAndControllerEpochOpt =
ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
@@ -244,31 +283,15 @@ class ReplicaStateMachine(controller: KafkaController)
extends Logging {
// send stop replica command to the replica so that it stops
fetching from the leader
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId),
topic, partition, deletePartition = false)
// As an optimization, the controller removes dead replicas from the
ISR
- val leaderAndIsrIsEmpty: Boolean =
- controllerContext.partitionLeadershipInfo.get(topicAndPartition)
match {
- case Some(currLeaderIsrAndControllerEpoch) =>
- controller.removeReplicaFromIsr(topic, partition, replicaId)
match {
- case Some(updatedLeaderIsrAndControllerEpoch) =>
- // send the shrunk ISR state change request to all the
remaining alive replicas of the partition.
- val currentAssignedReplicas =
controllerContext.partitionReplicaAssignment(topicAndPartition)
- if
(!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
-
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_
== replicaId),
- topic, partition, updatedLeaderIsrAndControllerEpoch,
replicaAssignment)
- }
- replicaState.put(partitionAndReplica, OfflineReplica)
- stateChangeLogger.trace("Controller %d epoch %d changed
state of replica %d for partition %s from %s to %s"
- .format(controllerId, controller.epoch, replicaId,
topicAndPartition, currState, targetState))
- false
- case None =>
- true
- }
- case None =>
- true
- }
- if (leaderAndIsrIsEmpty &&
!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition))
- throw new StateChangeFailedException(
- "Failed to change state of replica %d for partition %s since the
leader and isr path in zookeeper is empty"
- .format(replicaId, topicAndPartition))
+ leaderIsrAndControllerEpochOpt match {
+ case Some(currLeaderIsrAndControllerEpoch) =>
+ removeReplicaFromIsr(topic, partition, replicaId, currState,
currLeaderIsrAndControllerEpoch)
+ case None =>
+ if
(!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition))
+ throw new StateChangeFailedException(
+ "Failed to change state of replica %d for partition %s since
the leader and isr path in zookeeper is empty"
+ .format(replicaId, topicAndPartition))
+ }
}
}
catch {
@@ -278,6 +301,89 @@ class ReplicaStateMachine(controller: KafkaController)
extends Logging {
}
}
+ /**
+ * This method asychronously remove the given replica from ISR of the
partition. Typically controller is only
+ * responsible for updating the leader infromation but not the ISR
information of a partition. But as an optimization
+ * we let the controller remove replicas from ISR when the replica states
change to OFFLINE. In any case when
+ * controller successufully updated the ZK path it has to send a
LeaderAndIsrRequest to the brokers, otherwise the
+ * broker will not be able to update the ZK path to chagne ISR. If there is
no ZK path update from the controller,
+ * the controller does not need to send LeaderAndIsrRequest but can simply
update its local ISR information.
+ *
+ * @param topic the topic
+ * @param partition the partition id
+ * @param replicaId the replica id
+ * @param currState the current state fot the replica
+ * @param leaderIsrAndEpochOpt The current LeaderAndIsr information. It is
refreshed from zookeeper before the state
+ * change.
+ */
+ def removeReplicaFromIsr(topic: String,
+ partition: Int,
+ replicaId: Int,
+ currState: ReplicaState,
+ leaderIsrAndEpoch: LeaderIsrAndControllerEpoch) = {
+ val topicAndPartition = TopicAndPartition(topic, partition)
+ // In cases where states of multiple replicas of same partition are
changed in the same batch, we need to update
+ // the LeaderAndIsr based on the previous pending state change instead of
using the original LeaderAndIsr
+ // information from zookeeper.
+ val currLeaderAndIsr =
+ if (leaderAndIsrUpdateBatch.containsPartition(topicAndPartition))
+
leaderAndIsrUpdateBatch.pendingLeaderAndIsrUpdate(topicAndPartition).newLeaderAndIsr
+ else
+ leaderIsrAndEpoch.leaderAndIsr
+ val currZkVersion = leaderIsrAndEpoch.leaderAndIsr.zkVersion
+ val currLeaderEpoch = leaderIsrAndEpoch.leaderAndIsr.leaderEpoch
+
+ debug(s"Removing replica ${replicaId} from ISR ${currLeaderAndIsr.isr}}
for partition $topicAndPartition.")
+ if (currLeaderAndIsr.isr.contains(replicaId)) {
+ // The caller has already checked the ISR and it contains the replica Id
to be removed.
+ val newLeader = if (replicaId == currLeaderAndIsr.leader)
LeaderAndIsr.NoLeader else currLeaderAndIsr.leader
+ var newIsr = currLeaderAndIsr.isr.filter(b => b != replicaId)
+
+ // if the replica to be removed from the ISR is the last surviving
member of the ISR and unclean leader election
+ // is disallowed for the corresponding topic, then we must preserve the
ISR membership so that the replica can
+ // eventually be restored as the leader.
+ if (newIsr.isEmpty && !LogConfig.fromProps(controller.config.originals,
AdminUtils.fetchEntityConfig(zkUtils,
+ ConfigType.Topic,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
+ info("Retaining last ISR %d of partition %s since unclean leader
election is disabled".format(replicaId, topicAndPartition))
+ newIsr = currLeaderAndIsr.isr
+ }
+
+ val newLeaderAndIsr = new LeaderAndIsr(newLeader, currLeaderEpoch + 1,
newIsr, currZkVersion + 1)
+ leaderAndIsrUpdateBatch.addLeaderAndIsrUpdate(topicAndPartition,
+ newLeaderAndIsr,
+ currZkVersion,
+
onUpdateLeaderAndIsrSuccess)
+ debug(s"Added new LeaderAndIsr update $newLeaderAndIsr for
$topic-$partition")
+ } else {
+ warn(s"Cannot remove replica $replicaId from ISR of partition
$topicAndPartition since it is not in " +
+ s"the ISR. Leader = ${currLeaderAndIsr.leader} ; ISR =
${currLeaderAndIsr.isr}")
+ updateLocalState(LeaderIsrAndControllerEpoch(currLeaderAndIsr,
controller.epoch))
+ }
+
+ // If the controller successfully updated zk path, update local info and
send LeaderAndIsrRequest to
+ // the replicas of the partition.
+ def onUpdateLeaderAndIsrSuccess(topicAndPartition: TopicAndPartition,
+ leaderAndIsrUpdateResult:
ZkLeaderAndIsrUpdateResult) = {
+ trace(s"Successfully updated LeaderAndIsr info for $topicAndPartition.
New LeaderAndIsr is" +
+ s" ${leaderAndIsrUpdateResult.leaderAndIsr}")
+ val updatedLeaderAndIsr = leaderAndIsrUpdateResult.leaderAndIsr
+ val updatedLeaderIsrAndControllerEpoch =
LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controller.epoch)
+ updateLocalState(updatedLeaderIsrAndControllerEpoch)
+ }
+
+ def updateLocalState(updatedLeaderIsrAndControllerEpoch:
LeaderIsrAndControllerEpoch) {
+ controllerContext.partitionLeadershipInfo.put(topicAndPartition,
updatedLeaderIsrAndControllerEpoch)
+ val currentAssignedReplicas =
controllerContext.partitionReplicaAssignment(topicAndPartition)
+ if
(!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
+
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_
== replicaId),
+ topic, partition, updatedLeaderIsrAndControllerEpoch,
currentAssignedReplicas)
+ }
+ replicaState.put(PartitionAndReplica(topic, partition, replicaId),
OfflineReplica)
+ stateChangeLogger.trace("Controller %d epoch %d changed state of replica
%d for partition %s from %s to %s"
+ .format(controllerId, controller.epoch, replicaId, topicAndPartition,
currState, OfflineReplica))
+ }
+ }
+
def areAllReplicasForTopicDeleted(topic: String): Boolean = {
val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
val replicaStatesForTopic = replicasForTopic.map(r => (r,
replicaState(r))).toMap
diff --git
a/core/src/main/scala/kafka/controller/ZkLeaderAndIsrUpdateBatch.scala
b/core/src/main/scala/kafka/controller/ZkLeaderAndIsrUpdateBatch.scala
new file mode 100644
index 00000000000..7daa90368ce
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/ZkLeaderAndIsrUpdateBatch.scala
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.controller
+
+import java.util.concurrent.CountDownLatch
+
+import kafka.api.LeaderAndIsr
+import kafka.common.TopicAndPartition
+import kafka.utils.ReplicationUtils.trace
+import kafka.utils.{ReplicationUtils, ZkUtils}
+
+/**
+ * The class that batches the LeaderAndisrUpdate together.
+ */
+class ZkLeaderAndIsrUpdateBatch(zkUtils: ZkUtils) {
+ val leaderAndIsrUpdates = new collection.mutable.HashMap[TopicAndPartition,
ZkLeaderAndIsrUpdate]
+
+ def newBatch() = leaderAndIsrUpdates.clear()
+
+ def addLeaderAndIsrUpdate(topicAndPartition: TopicAndPartition,
+ newLeaderAndIsr: LeaderAndIsr,
+ expectZkVersion: Int,
+ onSuccess: (TopicAndPartition,
ZkLeaderAndIsrUpdateResult) => Unit) = {
+ // We need to chain up the callbacks if there are multiple callbacks to be
fired for the same partition.
+ // This is used by the ReplicaStateMachine where multiple replicas of the
same partition can change in the same
+ // batch.
+ val onSuccessCallbacks = {
+ if (leaderAndIsrUpdates.contains(topicAndPartition))
+ leaderAndIsrUpdates(topicAndPartition).onSuccessCallbacks :+ onSuccess
+ else
+ List(onSuccess)
+ }
+ leaderAndIsrUpdates += (topicAndPartition -> new
ZkLeaderAndIsrUpdate(newLeaderAndIsr, expectZkVersion, onSuccessCallbacks))
+ }
+
+ def completeLeaderAndIsrUpdate(topicAndPartition: TopicAndPartition) = {
+ leaderAndIsrUpdates -= topicAndPartition
+ }
+
+ def incompleteUpdates = this synchronized(leaderAndIsrUpdates.size)
+
+ def containsPartition(topicAndPartition: TopicAndPartition): Boolean =
+ leaderAndIsrUpdates.contains(topicAndPartition)
+
+ def pendingLeaderAndIsrUpdate(topicAndPartition: TopicAndPartition) = {
+ leaderAndIsrUpdates(topicAndPartition)
+ }
+
+ def writeLeaderAndIsrUpdateToZk(controllerEpoch: Int,
preConditionCheckerOpt: Option[() => Boolean] = None) =
+ ReplicationUtils.asyncUpdateLeaderAndIsr(zkUtils, this, controllerEpoch,
preConditionCheckerOpt)
+}
+
+/**
+ * This is a container class to host the LeaderAndIsr updates in zookeeper.
+ * @param newLeaderAndIsr The new LeaderAndIsr information.
+ * @param expectZkVersion The expected zkVersion of the path
+ * @param onSuccessCallbacks The callbacks to fire when the LeaderAndIsr
update succeeded. We need a list because there
+ * might be multiple replica state changes for the
same partition in one batch.
+ */
+class ZkLeaderAndIsrUpdate(val newLeaderAndIsr: LeaderAndIsr,
+ val expectZkVersion: Int,
+ val onSuccessCallbacks: List[(TopicAndPartition,
ZkLeaderAndIsrUpdateResult) => Unit])
+
+case class ZkLeaderAndIsrUpdateResult(val leaderAndIsr: LeaderAndIsr,
+ val newZkVersion: Int)
+
+case class ZkLeaderAndIsrReadResult(val leaderIsrAndControllerEpochOpt:
Option[LeaderIsrAndControllerEpoch],
+ val exceptionOpt: Option[Exception]) {
+ override def toString: String = "[" + leaderIsrAndControllerEpochOpt + "," +
exceptionOpt + "]"
+}
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 31e8a92cf0c..e97ccbb72fb 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -17,10 +17,13 @@
package kafka.utils
+import java.util.concurrent.CountDownLatch
+
import kafka.api.LeaderAndIsr
import kafka.common.TopicAndPartition
-import kafka.controller.{IsrChangeNotificationListener,
LeaderIsrAndControllerEpoch}
+import kafka.controller.{ZkLeaderAndIsrUpdate, ZkLeaderAndIsrReadResult,
ZkLeaderAndIsrUpdateResult, ZkLeaderAndIsrUpdateBatch,
IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
import kafka.utils.ZkUtils._
+import org.apache.zookeeper.ZooKeeper
import org.apache.zookeeper.data.Stat
import scala.collection._
@@ -39,6 +42,58 @@ object ReplicationUtils extends Logging {
updatePersistentPath
}
+ /**
+ * This method asynchronously updates the leader and ISR information in a
LeaderAndIsrUpdateBatch. It will block waiting
+ * until all the updates finish.
+ *
+ * The method takes an optional precondition check method and makes sure the
precondition is met before updating the
+ * leader and ISR. This is useful for the controller. When the controller
updates leader and ISR in zookeeper, it is
+ * important that the controller remains the controller during the entire
update process. To guarantee that, this
+ * method first take the current ZooKeeper session, and then check if the
current broker is the controller. If it
+ * is valid then this method will use the current ZK session for all the
updates.
+ *
+ * During the leader and ISR update, if the zookeeper connection is lost or
ZooKeeper session expired, the current
+ * ZooKeeper session will throw exception. We propagate the exception to
caller and let the caller to decide
+ * whether to continue or abort.
+ *
+ * Unlike the synchronous leader and isr update call which takes an optional
checker function, the async leader and
+ * ISR update call does not verify if the leader and isr data is the same or
not. The update will fail even if the
+ * data are the same but the zk version mismatch. We do this because we
cannot read data in the zookeeper event
+ * thread.
+ */
+ def asyncUpdateLeaderAndIsr(zkUtils: ZkUtils,
+ leaderAndIsrUpdateBatch:
ZkLeaderAndIsrUpdateBatch,
+ controllerEpoch: Int,
+ preconditionCheckerOpt: Option[() => Boolean] =
None) = {
+ val unprocessedUpdates = new
CountDownLatch(leaderAndIsrUpdateBatch.incompleteUpdates)
+ zkUtils.withCurrentSession(preconditionCheckerOpt) { zk =>
+ leaderAndIsrUpdateBatch.leaderAndIsrUpdates.foreach { case (tp, update)
=> {
+ val path = getTopicPartitionLeaderAndIsrPath(tp.topic, tp.partition)
+ val newLeaderAndIsr = update.newLeaderAndIsr
+ val newLeaderData = zkUtils.leaderAndIsrZkData(newLeaderAndIsr,
controllerEpoch)
+ zkUtils.asyncConditionalUpdatePersistentPath(path, newLeaderData,
update.expectZkVersion,
+ (updateSucceeded, updatedZkVersion) => {
+ // Remove the successfully updated partitions
+ // We do not handle failure and retry to make Zookeeper
EventThread light weighted. The caller is
+ // responsible to check if the update batch is finished or retry
is needed.
+ try {
+ trace(s"Received LeaderAndIsr update callback of update
$newLeaderAndIsr for $tp. " +
+ s"UpdateSucceeded = $updateSucceeded")
+ if (updateSucceeded) {
+ leaderAndIsrUpdateBatch.completeLeaderAndIsrUpdate(tp)
+ update.onSuccessCallbacks.foreach { case onSuccess =>
+ onSuccess(tp, new
ZkLeaderAndIsrUpdateResult(newLeaderAndIsr, updatedZkVersion))
+ }
+ }
+ } finally {
+ unprocessedUpdates.countDown()
+ }
+ }, Some(zk))
+ }}
+ }
+ unprocessedUpdates.await()
+ }
+
def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet:
Set[TopicAndPartition]): Unit = {
val isrChangeNotificationPath: String =
zkUtils.createSequentialPersistentPath(
ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
@@ -75,6 +130,34 @@ object ReplicationUtils extends Logging {
leaderAndIsrOpt.flatMap(leaderAndIsrStr =>
parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
}
+ def asyncGetLeaderIsrAndEpochForPartitions(zkUtils: ZkUtils,
+ partitions:
Set[TopicAndPartition]): Map[TopicAndPartition, ZkLeaderAndIsrReadResult] = {
+ val unprocessedReads = new CountDownLatch(partitions.size)
+ val zkLeaderAndIsrReadResults = new mutable.HashMap[TopicAndPartition,
ZkLeaderAndIsrReadResult]
+ partitions.foreach { case tap =>
+ val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(tap.topic,
tap.partition)
+ zkUtils.asyncReadDataMaybeNull(leaderAndIsrPath, new ZkReadCallback {
+ override def handle(dataOpt: Option[String], stat: Stat, exceptionOpt:
Option[Exception]): Unit = {
+ try {
+ zkLeaderAndIsrReadResults += tap -> {
+ if (exceptionOpt.isEmpty) {
+ val leaderIsrControllerEpochOpt =
+ dataOpt.flatMap(leaderAndIsrStr =>
parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
+ new ZkLeaderAndIsrReadResult(leaderIsrControllerEpochOpt,
None)
+ }
+ else
+ new ZkLeaderAndIsrReadResult(None, exceptionOpt)
+ }
+ } finally {
+ unprocessedReads.countDown()
+ }
+ }
+ })
+ }
+ unprocessedReads.await()
+ zkLeaderAndIsrReadResults
+ }
+
private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat:
Stat)
: Option[LeaderIsrAndControllerEpoch] = {
Json.parseFull(leaderAndIsrStr).flatMap {m =>
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 49d3cfaaf8b..796aaba7b95 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -17,7 +17,7 @@
package kafka.utils
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{Callable, CountDownLatch}
import kafka.admin._
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0, LeaderAndIsr}
@@ -29,10 +29,10 @@ import kafka.server.ConfigType
import kafka.utils.ZkUtils._
import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException,
ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
import org.I0Itec.zkclient.serialize.ZkSerializer
-import org.I0Itec.zkclient.{ZkClient, ZkConnection}
+import org.I0Itec.zkclient.{IZkConnection, ZkConnection}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
+import org.apache.zookeeper.AsyncCallback.{DataCallback, StatCallback,
StringCallback}
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper}
@@ -465,6 +465,28 @@ class ZkUtils(val zkClient: ZkClient,
}
}
+ /**
+ * Asynchronnously make conditional update on a persistent path. Unlike the
synchronous update call, this method does
+ * not take an optional checker function. It is the caller's responsibility
to refresh the data in ZK and decide
+ * whether to continue or abandon the update.
+ * @param path The persistent path to update
+ * @param data The data to write
+ * @param expectVersion The expected zkVersion
+ * @param resultHandler A method that takes (updateSucceeded, newZkVersion)
as argument. This method will be fired
+ * as a callback when the update finishes.
+ * @param zkOpt the zookeeper session to use for this update.
+ */
+ def asyncConditionalUpdatePersistentPath(path: String,
+ data: String,
+ expectVersion: Int,
+ resultHandler: (Boolean, Int) =>
Unit,
+ zkOpt: Option[ZooKeeper] = None) = {
+ val callbackWithData = new ZkWriteCallbackWithData(data, expectVersion) {
+ override def handle(updateSucceeded: Boolean, newZkVersion: Int) =
resultHandler(updateSucceeded, newZkVersion)
+ }
+ zkClient.asyncWriteDataAndReturnStat(path, data, callbackWithData,
expectVersion, zkOpt)
+ }
+
/**
* Conditional update the persistent path data, return (true, newVersion) if
it succeeds, otherwise (the current
* version is not the expected version, etc.) return (false, -1). If path
doesn't exist, throws ZkNoNodeException
@@ -554,6 +576,10 @@ class ZkUtils(val zkClient: ZkClient,
dataAndStat
}
+ def asyncReadDataMaybeNull(path: String, callback: DataCallback) = {
+ zkClient.asyncReadData(path, callback)
+ }
+
def getChildren(path: String): Seq[String] = {
import scala.collection.JavaConversions._
// triggers implicit conversion from java list to scala Seq
@@ -861,6 +887,13 @@ class ZkUtils(val zkClient: ZkClient,
}
}
+ def withCurrentSession(preconditionChecker: Option[() => Boolean] = None)(f:
ZooKeeper => Unit) {
+ val currentZk = zkClient.currentZk
+ if (!preconditionChecker.fold(true)(f => f()))
+ throw new IllegalStateException("The pre-check failed.")
+ f(currentZk)
+ }
+
def close() {
if(zkClient != null) {
zkClient.close()
@@ -1112,3 +1145,67 @@ class ZKCheckedEphemeral(path: String,
}
}
}
+
+private[kafka] class ZkClient(zkConnection: IZkConnection, connectionTimeout:
Int, zkSerializer: ZkSerializer)
+ extends org.I0Itec.zkclient.ZkClient(zkConnection, connectionTimeout,
zkSerializer) {
+
+ def this(zkServers: String, sessionTimeout: Int, connectionTimeout: Int,
zkSerializer: ZkSerializer) {
+ this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout,
zkSerializer)
+ }
+
+ def asyncWriteDataAndReturnStat(path: String,
+ datat: String,
+ callbackWithData: StatCallback,
+ expectedVersion: Int,
+ zkOpt: Option[ZooKeeper] = None) = {
+ val data: Array[Byte] = ZKStringSerializer.serialize(datat)
+ var zk = zkOpt.getOrElse(currentZk)
+ zk.setData(path, data, expectedVersion, callbackWithData, null)
+ }
+
+ def asyncReadData(path: String, callback: DataCallback) {
+ retryUntilConnected(new Callable[Object] {
+ def call: Object = {
+ currentZk.getData(path, false, callback, null)
+ null
+ }
+ })
+ }
+
+ def currentZk = _connection.asInstanceOf[ZkConnection].getZookeeper
+}
+
+private[kafka] abstract class ZkReadCallback extends DataCallback {
+ override def processResult(rc: Int, path: String, ctx: Object, data:
Array[Byte], stat: Stat) {
+ val code = Code.get(rc)
+ val datat = code match {
+ case Code.OK =>
Some(ZKStringSerializer.deserialize(data).asInstanceOf[String])
+ case _ => None
+ }
+ // We mimic the sync zkClientBehavior here to return an exception.
+ val exceptionOpt = if (code != Code.OK && code != Code.NONODE)
Some(KeeperException.create(Code.get(rc))) else None
+ handle(datat, stat, exceptionOpt)
+ }
+
+ def handle(dataOpt: Option[String], stat: Stat, exceptionOpt:
Option[Exception])
+}
+
+private[kafka] abstract class ZkWriteCallbackWithData(val data: String,
+ val expectVersion: Int)
+ extends StatCallback with Logging {
+
+ override def processResult(rc: Int, path: String, ctx: Object, stat: Stat) {
+ val code = Code.get(rc)
+ trace(s"Received return code $code for update of zk path $path")
+ val (updateSucceeded, newZkVersion) = code match {
+ case Code.OK => (true, stat.getVersion)
+ case _ =>
+ warn("Conditional update of path %s with data %s and expected version
%d failed due to %s".format(path, data,
+ expectVersion, KeeperException.create(code).getMessage))
+ (false, -1)
+ }
+ handle(updateSucceeded, newZkVersion)
+ }
+
+ def handle(updateSucceeded: Boolean, newZkVersion: Int)
+}
diff --git
a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 818229c2510..816c7188b70 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -18,9 +18,8 @@
package kafka.consumer
import org.easymock.EasyMock
-import org.I0Itec.zkclient.ZkClient
import org.apache.zookeeper.data.Stat
-import kafka.utils.{TestUtils, Logging, ZkUtils, Json}
+import kafka.utils.{ZkClient, TestUtils, Logging, ZkUtils, Json}
import org.junit.Assert._
import kafka.common.TopicAndPartition
import kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo
diff --git
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index b725d8b59ee..8ffad86692d 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -251,6 +251,7 @@ class UncleanLeaderElectionTest extends
ZooKeeperTestHarness {
produceMessage(servers, topic, "third")
waitUntilMetadataIsPropagated(servers, topic, partitionId)
+ Thread.sleep(1000)
servers.filter(server => server.config.brokerId == leaderId).map(server =>
shutdownServer(server))
// verify clean leader transition to ISR follower
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a5a8df1e32a..83a999342f4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -25,8 +25,7 @@ import kafka.api.{FetchResponsePartitionData,
PartitionFetchInfo}
import kafka.cluster.Broker
import kafka.common.TopicAndPartition
import kafka.message.{ByteBufferMessageSet, Message}
-import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
-import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZkClient, MockScheduler, MockTime, TestUtils, ZkUtils}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.LeaderAndIsrRequest
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index d4ddb2f7ced..4b7fb2acd71 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -17,7 +17,7 @@
package kafka.utils
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{ZkLeaderAndIsrUpdateBatch,
LeaderIsrAndControllerEpoch}
import kafka.server.{ReplicaFetcherManager, KafkaConfig}
import kafka.api.LeaderAndIsr
import kafka.zk.ZooKeeperTestHarness
@@ -94,6 +94,57 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
"my-topic-test", partitionId, newLeaderAndIsr3, controllerEpoch,
zkVersion + 1)
assertFalse(updateSucceeded3)
assertEquals(newZkVersion3,-1)
+
+ //test async update leader and isr
+ val topicAndPartition = TopicAndPartition("my-topic-test", partitionId)
+ val leaderAndIsrUpdateBatch = new ZkLeaderAndIsrUpdateBatch(zkUtils)
+
+ // Test regular update
+ var updateSucceeded4 = false
+ var newZkVersion4 = -1
+ val newLeaderAndIsr4 = new LeaderAndIsr(brokerId, leaderEpoch + 1,
replicas, zkVersion = 2)
+ leaderAndIsrUpdateBatch.addLeaderAndIsrUpdate(topicAndPartition,
newLeaderAndIsr4, expectZkVersion = 1,
+ (_, updateResult) => {
+ updateSucceeded4 = true
+ newZkVersion4 = updateResult.newZkVersion
+ }
+ )
+ leaderAndIsrUpdateBatch.writeLeaderAndIsrUpdateToZk(controllerEpoch)
+ assertTrue(updateSucceeded4)
+ assertEquals(2, newZkVersion4)
+ assertEquals(0, leaderAndIsrUpdateBatch.incompleteUpdates)
+
+ // Test mismatched zkversion with same data
+ var updateSucceeded5 = false
+ var newZkVersion5 = -1
+ val newLeaderAndIsr5 = new LeaderAndIsr(brokerId, leaderEpoch + 1,
replicas, zkVersion = 3)
+ leaderAndIsrUpdateBatch.addLeaderAndIsrUpdate(topicAndPartition,
newLeaderAndIsr5, expectZkVersion = 3,
+ (_, updateResult) => {
+ updateSucceeded5 = true
+ newZkVersion5 = updateResult.newZkVersion
+ }
+ )
+ leaderAndIsrUpdateBatch.writeLeaderAndIsrUpdateToZk(controllerEpoch)
+ assertFalse(updateSucceeded5)
+ assertEquals(-1, newZkVersion5)
+ assertEquals(1, leaderAndIsrUpdateBatch.incompleteUpdates)
+ assertTrue(leaderAndIsrUpdateBatch.containsPartition(topicAndPartition))
+
+ // Test mismatched zkversion with different data
+ var updateSucceeded6 = false
+ var newZkVersion6 = -1
+ val newLeaderAndIsr6 = new LeaderAndIsr(brokerId, leaderEpoch + 2,
replicas, zkVersion = 3)
+ leaderAndIsrUpdateBatch.addLeaderAndIsrUpdate(topicAndPartition,
newLeaderAndIsr1, expectZkVersion = 3,
+ (_, updateResult) => {
+ updateSucceeded6 = true
+ newZkVersion6 = updateResult.newZkVersion
+ }
+ )
+ leaderAndIsrUpdateBatch.writeLeaderAndIsrUpdateToZk(controllerEpoch)
+ assertFalse(updateSucceeded6)
+ assertEquals(-1, newZkVersion6)
+ assertEquals(1, leaderAndIsrUpdateBatch.incompleteUpdates)
+ assertTrue(leaderAndIsrUpdateBatch.containsPartition(topicAndPartition))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/zk/ZkClientAsyncOperationTest.scala
b/core/src/test/scala/unit/kafka/zk/ZkClientAsyncOperationTest.scala
new file mode 100644
index 00000000000..660ca463b5d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/ZkClientAsyncOperationTest.scala
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk
+
+import kafka.utils.{ZkReadCallback, TestUtils}
+import org.apache.zookeeper.data.Stat
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+
+class ZkClientAsyncOperationTest extends ZooKeeperTestHarness {
+
+ @Test
+ def testAsyncWrite() {
+ val numPaths = 1000
+ val paths = (0 until numPaths).map { i => {
+ val path = "/testPath" + i
+ zkUtils.createPersistentPath(path, i.toString)
+ val (dataOpt, stat) = zkUtils.readDataMaybeNull(path)
+ assertEquals(i.toString, dataOpt.get)
+ val zkVersion = stat.getVersion
+ assertEquals(0, zkVersion)
+ path
+ }}
+
+ var callbackFired = 0
+ var errorInCallback = false
+ def handleResult(updateSucceeded: Boolean, newZkVersion: Int) = {
+ if(!updateSucceeded)
+ errorInCallback = true
+ if (newZkVersion != 1)
+ errorInCallback = true
+ callbackFired += 1
+ }
+ paths.foreach { path =>
+ zkUtils.asyncConditionalUpdatePersistentPath(path, "-1", 0, handleResult)
+ }
+ TestUtils.waitUntilTrue(() => callbackFired == numPaths, "Callback did not
fire before timeout.")
+ assertFalse(errorInCallback)
+ paths.foreach { path =>
+ val (newData, newStat) = zkUtils.readDataMaybeNull(path)
+ assertEquals("-1", newData.get)
+ assertEquals(1, newStat.getVersion)
+ }
+ }
+
+ @Test
+ def testAsyncRead() {
+ val numPaths = 1000
+ val paths = (0 until numPaths).map { i => {
+ val path = "/testPath" + i
+ zkUtils.createPersistentPath(path, i.toString)
+ val (dataOpt, stat) = zkUtils.readDataMaybeNull(path)
+ assertEquals(i.toString, dataOpt.get)
+ val zkVersion = stat.getVersion
+ assertEquals(0, zkVersion)
+ path
+ }}
+
+ var writeCallbackFired = 0
+ def handleWriteResult(updateSucceeded: Boolean, newZkVersion: Int) = {
+ assertTrue(updateSucceeded)
+ assertEquals(1, newZkVersion)
+ writeCallbackFired += 1
+ }
+ paths.foreach { path =>
+ zkUtils.asyncConditionalUpdatePersistentPath(path, path + "data", 0,
handleWriteResult)
+ }
+ TestUtils.waitUntilTrue(() => writeCallbackFired == numPaths, "Callback
did not fire before timeout.")
+
+ var readCallbackFired = 0
+ var errorInCallback = false
+ paths.foreach { path => {
+ zkUtils.asyncReadDataMaybeNull(path, new ZkReadCallback() {
+ override def handle(dataOpt: Option[String], stat: Stat, exceptionOpt:
Option[Exception]) = {
+ if (!exceptionOpt.isEmpty)
+ errorInCallback = true
+ if (path + "data" != dataOpt.get)
+ errorInCallback = true
+ if (stat.getVersion != 1)
+ errorInCallback = true
+ readCallbackFired += 1
+ }
+ })
+ }}
+ TestUtils.waitUntilTrue(() => readCallbackFired == numPaths, "Callback did
not fire before timeout.")
+ assertFalse(errorInCallback)
+ }
+
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Speed up controlled shutdown.
> -----------------------------
>
> Key: KAFKA-3436
> URL: https://issues.apache.org/jira/browse/KAFKA-3436
> Project: Kafka
> Issue Type: Improvement
> Affects Versions: 0.9.0.0
> Reporter: Jiangjie Qin
> Assignee: Jiangjie Qin
>
> Currently rolling bounce a Kafka cluster with tens of thousands of partitions
> can take very long (~2 min for each broker with ~5000 partitions/broker in
> our environment). The majority of the time is spent on shutting down a
> broker. The time of shutting down a broker usually includes the following
> parts:
> T1: During controlled shutdown, people usually want to make sure there is no
> under replicated partitions. So shutting down a broker during a rolling
> bounce will have to wait for the previous restarted broker to catch up. This
> is T1.
> T2: The time to send controlled shutdown request and receive controlled
> shutdown response. Currently the a controlled shutdown request will trigger
> many LeaderAndIsrRequest and UpdateMetadataRequest. And also involving many
> zookeeper update in serial.
> T3: The actual time to shutdown all the components. It is usually small
> compared with T1 and T2.
> T1 is related to:
> A) the inbound throughput on the cluster, and
> B) the "down" time of the broker (time between replica fetchers stop and
> replica fetchers restart)
> The larger the traffic is, or the longer the broker stopped fetching, the
> longer it will take for the broker to catch up and get back into ISR.
> Therefore the longer T1 will be. Assume:
> * the in bound network traffic is X bytes/second on a broker
> * the time T1.B ("down" time) mentioned above is T
> Theoretically it will take (X * T) / (NetworkBandwidth - X) =
> InBoundNetworkUtilization * T / (1 - InboundNetworkUtilization) for a the
> broker to catch up after the restart. While X is out of our control, T is
> largely related to T2.
> The purpose of this ticket is to reduce T2 by:
> 1. Batching the LeaderAndIsrRequest and UpdateMetadataRequest during
> controlled shutdown.
> 2. Use async zookeeper write to pipeline zookeeper writes. According to
> Zookeeper wiki(https://wiki.apache.org/hadoop/ZooKeeper/Performance), a 3
> node ZK cluster should be able to handle 20K writes (1K size). So if we use
> async write, likely we will be able to reduce zookeeper update time to lower
> seconds or even sub-second level.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)