This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 297fb39 KAFKA-6082; Fence zookeeper updates with controller epoch
zkVersion
297fb39 is described below
commit 297fb396a0038addea06a76fd4ab9a451eb7562e
Author: Zhanxiang (Patrick) Huang <[email protected]>
AuthorDate: Fri Sep 7 14:17:49 2018 -0700
KAFKA-6082; Fence zookeeper updates with controller epoch zkVersion
This PR aims to enforce that the controller can only update zookeeper
states after checking the controller epoch zkVersion. The check and zookeeper
state updates are wrapped in the zookeeper multi() operations to ensure that
they are done atomically. This PR is necessary to resolve issues related to
multiple controllers (i.e. old controller updates zookeeper states before
resignation, which is possible during controller failover based on the single
threaded event queue model we have)
This PR includes the following changes:
- Add MultiOp request and response in ZookeeperClient
- Ensure all zookeeper updates done by controller are protected by checking
the current controller epoch zkVersion
- Modify test cases in KafkaZkClientTest to test mismatch controller epoch
zkVersion
Tests Done:
- Unit tests (with updated tests to test mismatch controller epoch
zkVersion)
- Existing integration tests
Author: Zhanxiang (Patrick) Huang <[email protected]>
Reviewers: Jun Rao <[email protected]>, Dong Lin <[email protected]>,
Manikumar Reddy O <[email protected]>
Closes #5101 from hzxa21/KAFKA-6082
---
core/src/main/scala/kafka/cluster/Partition.scala | 2 +-
.../scala/kafka/controller/ControllerContext.scala | 4 +-
.../kafka/controller/ControllerEventManager.scala | 10 +-
.../scala/kafka/controller/KafkaController.scala | 161 +++++++-------
.../kafka/controller/PartitionStateMachine.scala | 11 +-
.../kafka/controller/ReplicaStateMachine.scala | 6 +-
.../kafka/controller/TopicDeletionManager.scala | 8 +-
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 232 ++++++++++++++++-----
.../scala/kafka/zookeeper/ZooKeeperClient.scala | 134 ++++++++----
.../admin/ReassignPartitionsClusterTest.scala | 4 +-
.../controller/ControllerEventManagerTest.scala | 2 +-
.../controller/ControllerIntegrationTest.scala | 153 +++++++++++---
.../controller/PartitionStateMachineTest.scala | 16 +-
.../kafka/controller/ReplicaStateMachineTest.scala | 2 +-
.../unit/kafka/utils/LogCaptureAppender.scala | 66 ++++++
.../unit/kafka/utils/ReplicationUtilsTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 4 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 157 +++++++++-----
19 files changed, 680 insertions(+), 298 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 22c1508..d76d6d0 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -73,7 +73,7 @@ class Partition(val topic: String,
* the controller sends it a start replica command containing the leader for
each partition that the broker hosts.
* In addition to the leader, the controller can also send the epoch of the
controller that elected the leader for
* each partition. */
- private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
+ private var controllerEpoch: Int = KafkaController.InitialControllerEpoch
this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
private def isReplicaLocal(replicaId: Int) : Boolean = replicaId ==
localBrokerId || replicaId == Request.FutureLocalReplicaId
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala
b/core/src/main/scala/kafka/controller/ControllerContext.scala
index f4671cf..20c3de0 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -28,8 +28,8 @@ class ControllerContext {
var controllerChannelManager: ControllerChannelManager = null
var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
- var epoch: Int = KafkaController.InitialControllerEpoch - 1
- var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
+ var epoch: Int = KafkaController.InitialControllerEpoch
+ var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
var allTopics: Set[String] = Set.empty
private var partitionReplicaAssignmentUnderlying: mutable.Map[String,
mutable.Map[Int, Seq[Int]]] = mutable.Map.empty
val partitionLeadershipInfo: mutable.Map[TopicPartition,
LeaderIsrAndControllerEpoch] = mutable.Map.empty
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index 13967e0..c93e9e7 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -24,6 +24,7 @@ import com.yammer.metrics.core.Gauge
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.utils.CoreUtils.inLock
import kafka.utils.ShutdownableThread
+import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.utils.Time
import scala.collection._
@@ -32,12 +33,14 @@ object ControllerEventManager {
val ControllerEventThreadName = "controller-event-thread"
}
class ControllerEventManager(controllerId: Int, rateAndTimeMetrics:
Map[ControllerState, KafkaTimer],
- eventProcessedListener: ControllerEvent => Unit)
extends KafkaMetricsGroup {
+ eventProcessedListener: ControllerEvent => Unit,
+ controllerMovedListener: () => Unit) extends
KafkaMetricsGroup {
@volatile private var _state: ControllerState = ControllerState.Idle
private val putLock = new ReentrantLock()
private val queue = new LinkedBlockingQueue[ControllerEvent]
- private val thread = new
ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
+ // Visible for test
+ private[controller] val thread = new
ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
private val time = Time.SYSTEM
private val eventQueueTimeHist = newHistogram("EventQueueTimeMs")
@@ -86,6 +89,9 @@ class ControllerEventManager(controllerId: Int,
rateAndTimeMetrics: Map[Controll
controllerEvent.process()
}
} catch {
+ case e: ControllerMovedException =>
+ info(s"Controller moved to another broker when processing
$controllerEvent.", e)
+ controllerMovedListener()
case e: Throwable => error(s"Error processing event
$controllerEvent", e)
}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 286768f..379e66d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -35,14 +35,14 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
LeaderAndIsrResponse, StopReplicaResponse}
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.KeeperException
-import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
+import org.apache.zookeeper.KeeperException.Code
import scala.collection._
import scala.util.Try
object KafkaController extends Logging {
- val InitialControllerEpoch = 1
- val InitialControllerEpochZkVersion = 1
+ val InitialControllerEpoch = 0
+ val InitialControllerEpochZkVersion = 0
/**
* ControllerEventThread will shutdown once it sees this event
@@ -52,6 +52,12 @@ object KafkaController extends Logging {
override def process(): Unit = ()
}
+ // Used only by test
+ private[controller] case class AwaitOnLatch(latch: CountDownLatch) extends
ControllerEvent {
+ override def state: ControllerState = ControllerState.ControllerChange
+ override def process(): Unit = latch.await()
+ }
+
}
class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time:
Time, metrics: Metrics, initialBrokerInfo: BrokerInfo,
@@ -70,7 +76,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
// visible for testing
private[controller] val eventManager = new
ControllerEventManager(config.brokerId,
- controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics())
+ controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics(), () =>
maybeResign())
val topicDeletionManager = new TopicDeletionManager(this, eventManager,
zkClient)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this,
stateChangeLogger)
@@ -214,21 +220,15 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
/**
* This callback is invoked by the zookeeper leader elector on electing the
current broker as the new controller.
* It does the following things on the become-controller state change -
- * 1. Registers controller epoch changed listener
- * 2. Increments the controller epoch
- * 3. Initializes the controller's context object that holds cache objects
for current topics, live brokers and
+ * 1. Initializes the controller's context object that holds cache objects
for current topics, live brokers and
* leaders for all existing partitions.
- * 4. Starts the controller's channel manager
- * 5. Starts the replica state machine
- * 6. Starts the partition state machine
+ * 2. Starts the controller's channel manager
+ * 3. Starts the replica state machine
+ * 4. Starts the partition state machine
* If it encounters any unexpected exception/error while becoming
controller, it resigns as the current controller.
* This ensures another controller election will be triggered and there will
always be an actively serving controller
*/
private def onControllerFailover() {
- info("Reading controller epoch from ZooKeeper")
- readControllerEpochFromZooKeeper()
- info("Incrementing controller epoch in ZooKeeper")
- incrementControllerEpoch()
info("Registering handlers")
// before reading source of truth from zookeeper, register the listeners
to get broker/topic callbacks
@@ -239,9 +239,9 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
info("Deleting log dir event notifications")
- zkClient.deleteLogDirEventNotifications()
+ zkClient.deleteLogDirEventNotifications(controllerContext.epochZkVersion)
info("Deleting isr change notifications")
- zkClient.deleteIsrChangeNotifications()
+ zkClient.deleteIsrChangeNotifications(controllerContext.epochZkVersion)
info("Initializing controller context")
initializeControllerContext()
info("Fetching topic deletions in progress")
@@ -599,6 +599,9 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
onPartitionReassignment(tp, reassignedPartitionContext)
} catch {
+ case e: ControllerMovedException =>
+ error(s"Error completing reassignment of partition $tp because
controller has moved to another broker", e)
+ throw e
case e: Throwable =>
error(s"Error completing reassignment of partition $tp", e)
// remove the partition from the admin path to unblock the
admin client
@@ -619,41 +622,15 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
try {
partitionStateMachine.handleStateChanges(partitions.toSeq,
OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
} catch {
+ case e: ControllerMovedException =>
+ error(s"Error completing preferred replica leader election for
partitions ${partitions.mkString(",")} because controller has moved to another
broker.", e)
+ throw e
case e: Throwable => error(s"Error completing preferred replica leader
election for partitions ${partitions.mkString(",")}", e)
} finally {
removePartitionsFromPreferredReplicaElection(partitions,
isTriggeredByAutoRebalance)
}
}
- private def incrementControllerEpoch(): Unit = {
- val newControllerEpoch = controllerContext.epoch + 1
- val setDataResponse = zkClient.setControllerEpochRaw(newControllerEpoch,
controllerContext.epochZkVersion)
- setDataResponse.resultCode match {
- case Code.OK =>
- controllerContext.epochZkVersion = setDataResponse.stat.getVersion
- controllerContext.epoch = newControllerEpoch
- case Code.NONODE =>
- // if path doesn't exist, this is the first controller whose epoch
should be 1
- // the following call can still fail if another controller gets
elected between checking if the path exists and
- // trying to create the controller epoch path
- val createResponse =
zkClient.createControllerEpochRaw(KafkaController.InitialControllerEpoch)
- createResponse.resultCode match {
- case Code.OK =>
- controllerContext.epoch = KafkaController.InitialControllerEpoch
- controllerContext.epochZkVersion =
KafkaController.InitialControllerEpochZkVersion
- case Code.NODEEXISTS =>
- throw new ControllerMovedException("Controller moved to another
broker. Aborting controller startup procedure")
- case _ =>
- val exception = createResponse.resultException.get
- error("Error while incrementing controller epoch", exception)
- throw exception
- }
- case _ =>
- throw new ControllerMovedException("Controller moved to another
broker. Aborting controller startup procedure")
- }
- info(s"Epoch incremented to ${controllerContext.epoch}")
- }
-
private def initializeControllerContext() {
// update controller cache with delete topic information
controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet
@@ -783,7 +760,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
private def updateAssignedReplicasForPartition(partition: TopicPartition,
replicas: Seq[Int]) {
controllerContext.updatePartitionReplicaAssignment(partition, replicas)
- val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic,
controllerContext.partitionReplicaAssignmentForTopic(partition.topic))
+ val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic,
controllerContext.partitionReplicaAssignmentForTopic(partition.topic),
controllerContext.epochZkVersion)
setDataResponse.resultCode match {
case Code.OK =>
info(s"Updated assigned replicas for partition $partition being
reassigned to ${replicas.mkString(",")}")
@@ -844,16 +821,6 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
controllerContext.partitionsBeingReassigned.values.foreach(_.unregisterReassignIsrChangeHandler(zkClient))
}
- private def readControllerEpochFromZooKeeper() {
- // initialize the controller epoch and zk version by reading from zookeeper
- val epochAndStatOpt = zkClient.getControllerEpoch
- epochAndStatOpt.foreach { case (epoch, stat) =>
- controllerContext.epoch = epoch
- controllerContext.epochZkVersion = stat.getVersion
- info(s"Initialized controller epoch to ${controllerContext.epoch} and zk
version ${controllerContext.epochZkVersion}")
- }
- }
-
/**
* Remove partition from partitions being reassigned in ZooKeeper and
ControllerContext. If the partition reassignment
* is complete (i.e. there is no other partition with a reassignment in
progress), the reassign_partitions znode
@@ -874,12 +841,12 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
// write the new list to zookeeper
if (updatedPartitionsBeingReassigned.isEmpty) {
info(s"No more partitions need to be reassigned. Deleting zk path
${ReassignPartitionsZNode.path}")
- zkClient.deletePartitionReassignment()
+ zkClient.deletePartitionReassignment(controllerContext.epochZkVersion)
// Ensure we detect future reassignments
eventManager.put(PartitionReassignment)
} else {
val reassignment =
updatedPartitionsBeingReassigned.mapValues(_.newReplicas)
- try zkClient.setOrCreatePartitionReassignment(reassignment)
+ try zkClient.setOrCreatePartitionReassignment(reassignment,
controllerContext.epochZkVersion)
catch {
case e: KeeperException => throw new AdminOperationException(e)
}
@@ -902,7 +869,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
}
}
if (!isTriggeredByAutoRebalance) {
- zkClient.deletePreferredReplicaElection()
+ zkClient.deletePreferredReplicaElection(controllerContext.epochZkVersion)
// Ensure we detect future preferred replica leader elections
eventManager.put(PreferredReplicaLeaderElection)
}
@@ -955,7 +922,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
// update the new leadership decision in zookeeper or retry
val UpdateLeaderAndIsrResult(successfulUpdates, _, failedUpdates) =
- zkClient.updateLeaderAndIsr(immutable.Map(partition ->
newLeaderAndIsr), epoch)
+ zkClient.updateLeaderAndIsr(immutable.Map(partition ->
newLeaderAndIsr), epoch, controllerContext.epochZkVersion)
if (successfulUpdates.contains(partition)) {
val finalLeaderAndIsr = successfulUpdates(partition)
finalLeaderIsrAndControllerEpoch =
Some(LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch))
@@ -1204,13 +1171,32 @@ class KafkaController(val config: KafkaConfig,
zkClient: KafkaZkClient, time: Ti
}
private def triggerControllerMove(): Unit = {
- onControllerResignation()
- activeControllerId = -1
- zkClient.deleteController()
+ activeControllerId = zkClient.getControllerId.getOrElse(-1)
+ if (!isActive) {
+ warn("Controller has already moved when trying to trigger controller
movement")
+ return
+ }
+ try {
+ val expectedControllerEpochZkVersion = controllerContext.epochZkVersion
+ activeControllerId = -1
+ onControllerResignation()
+ zkClient.deleteController(expectedControllerEpochZkVersion)
+ } catch {
+ case _: ControllerMovedException =>
+ warn("Controller has already moved when trying to trigger controller
movement")
+ }
+ }
+
+ private def maybeResign(): Unit = {
+ val wasActiveBeforeChange = isActive
+
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
+ activeControllerId = zkClient.getControllerId.getOrElse(-1)
+ if (wasActiveBeforeChange && !isActive) {
+ onControllerResignation()
+ }
}
private def elect(): Unit = {
- val timestamp = time.milliseconds
activeControllerId = zkClient.getControllerId.getOrElse(-1)
/*
* We can get here during the initial startup and the handleDeleted ZK
callback. Because of the potential race condition,
@@ -1223,22 +1209,27 @@ class KafkaController(val config: KafkaConfig,
zkClient: KafkaZkClient, time: Ti
}
try {
- zkClient.registerController(config.brokerId, timestamp)
- info(s"${config.brokerId} successfully elected as the controller")
+ val (epoch, epochZkVersion) =
zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
+ controllerContext.epoch = epoch
+ controllerContext.epochZkVersion = epochZkVersion
activeControllerId = config.brokerId
+
+ info(s"${config.brokerId} successfully elected as the controller. Epoch
incremented to ${controllerContext.epoch} " +
+ s"and epoch zk version is now ${controllerContext.epochZkVersion}")
+
onControllerFailover()
} catch {
- case _: NodeExistsException =>
- // If someone else has written the path, then
- activeControllerId = zkClient.getControllerId.getOrElse(-1)
+ case e: ControllerMovedException =>
+ maybeResign()
if (activeControllerId != -1)
- debug(s"Broker $activeControllerId was elected as controller instead
of broker ${config.brokerId}")
+ debug(s"Broker $activeControllerId was elected as controller instead
of broker ${config.brokerId}", e)
else
- warn("A controller has been elected but just resigned, this will
result in another round of election")
+ warn("A controller has been elected but just resigned, this will
result in another round of election", e)
- case e2: Throwable =>
- error(s"Error while electing or becoming controller on broker
${config.brokerId}", e2)
+ case t: Throwable =>
+ error(s"Error while electing or becoming controller on broker
${config.brokerId}. " +
+ s"Trigger controller movement immediately", t)
triggerControllerMove()
}
}
@@ -1321,7 +1312,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
onBrokerLogDirFailure(brokerIds)
} finally {
// delete processed children
- zkClient.deleteLogDirEventNotifications(sequenceNumbers)
+ zkClient.deleteLogDirEventNotifications(sequenceNumbers,
controllerContext.epochZkVersion)
}
}
}
@@ -1336,7 +1327,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
val existingPartitionReplicaAssignment =
newPartitionReplicaAssignment.filter(p =>
existingPartitions.contains(p._1.partition.toString))
- zkClient.setTopicAssignment(topic, existingPartitionReplicaAssignment)
+ zkClient.setTopicAssignment(topic, existingPartitionReplicaAssignment,
controllerContext.epochZkVersion)
}
override def process(): Unit = {
@@ -1377,7 +1368,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
if (nonExistentTopics.nonEmpty) {
warn(s"Ignoring request to delete non-existing topics
${nonExistentTopics.mkString(",")}")
- zkClient.deleteTopicDeletions(nonExistentTopics.toSeq)
+ zkClient.deleteTopicDeletions(nonExistentTopics.toSeq,
controllerContext.epochZkVersion)
}
topicsToBeDeleted --= nonExistentTopics
if (config.deleteTopicEnable) {
@@ -1396,7 +1387,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
} else {
// If delete topic is disabled remove entries under zookeeper path :
/admin/delete_topics
info(s"Removing $topicsToBeDeleted since delete topic is disabled")
- zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq)
+ zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq,
controllerContext.epochZkVersion)
}
}
}
@@ -1469,7 +1460,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
}
} finally {
// delete the notifications
- zkClient.deleteIsrChangeNotifications(sequenceNumbers)
+ zkClient.deleteIsrChangeNotifications(sequenceNumbers,
controllerContext.epochZkVersion)
}
}
@@ -1504,12 +1495,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
override def state = ControllerState.ControllerChange
override def process(): Unit = {
- val wasActiveBeforeChange = isActive
-
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
- activeControllerId = zkClient.getControllerId.getOrElse(-1)
- if (wasActiveBeforeChange && !isActive) {
- onControllerResignation()
- }
+ maybeResign()
}
}
@@ -1517,12 +1503,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
override def state = ControllerState.ControllerChange
override def process(): Unit = {
- val wasActiveBeforeChange = isActive
-
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
- activeControllerId = zkClient.getControllerId.getOrElse(-1)
- if (wasActiveBeforeChange && !isActive) {
- onControllerResignation()
- }
+ maybeResign()
elect()
}
}
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 3a0ac19..663ee8d 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -23,6 +23,7 @@ import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
@@ -132,6 +133,9 @@ class PartitionStateMachine(config: KafkaConfig,
doHandleStateChanges(partitions, targetState,
partitionLeaderElectionStrategyOpt)
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
+ case e: ControllerMovedException =>
+ error(s"Controller moved to another broker when moving some
partitions to $targetState state", e)
+ throw e
case e: Throwable => error(s"Error while moving some partitions to
$targetState state", e)
}
}
@@ -250,8 +254,11 @@ class PartitionStateMachine(config: KafkaConfig,
partition -> leaderIsrAndControllerEpoch
}.toMap
val createResponses = try {
- zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
+ zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs,
controllerContext.epochZkVersion)
} catch {
+ case e: ControllerMovedException =>
+ error("Controller moved to another broker when trying to create the
topic partition state znode", e)
+ throw e
case e: Exception =>
partitionsWithLiveReplicas.foreach { case (partition,_) =>
logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
Seq.empty
@@ -361,7 +368,7 @@ class PartitionStateMachine(config: KafkaConfig,
val recipientsPerPartition = partitionsWithLeaders.map { case (partition,
_, recipients) => partition -> recipients }.toMap
val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition,
leaderAndIsrOpt, _) => partition -> leaderAndIsrOpt.get }.toMap
val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry,
failedUpdates) = zkClient.updateLeaderAndIsr(
- adjustedLeaderAndIsrs, controllerContext.epoch)
+ adjustedLeaderAndIsrs, controllerContext.epoch,
controllerContext.epochZkVersion)
successfulUpdates.foreach { case (partition, leaderAndIsr) =>
val replicas = controllerContext.partitionReplicaAssignment(partition)
val leaderIsrAndControllerEpoch =
LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 1ab8a43..433ab56 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -23,6 +23,7 @@ import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.zookeeper.KeeperException.Code
import scala.collection.mutable
@@ -106,6 +107,9 @@ class ReplicaStateMachine(config: KafkaConfig,
}
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
+ case e: ControllerMovedException =>
+ error(s"Controller moved to another broker when moving some replicas
to $targetState state", e)
+ throw e
case e: Throwable => error(s"Error while moving some replicas to
$targetState state", e)
}
}
@@ -299,7 +303,7 @@ class ReplicaStateMachine(config: KafkaConfig,
leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
}
val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry,
failedUpdates) = zkClient.updateLeaderAndIsr(
- adjustedLeaderAndIsrs, controllerContext.epoch)
+ adjustedLeaderAndIsrs, controllerContext.epoch,
controllerContext.epochZkVersion)
val exceptionsForPartitionsWithNoLeaderAndIsrInZk =
partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
if (!topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {
val exception = new StateChangeFailedException(s"Failed to change
state of replica $replicaId for partition $partition since the leader and isr
path in zookeeper is empty")
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 8d93ef2..1ef79be 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -92,7 +92,7 @@ class TopicDeletionManager(controller: KafkaController,
} else {
// if delete topic is disabled clean the topic entries under
/admin/delete_topics
info(s"Removing $initialTopicsToBeDeleted since delete topic is
disabled")
- zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq)
+ zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq,
controllerContext.epochZkVersion)
}
}
@@ -251,9 +251,9 @@ class TopicDeletionManager(controller: KafkaController,
controller.replicaStateMachine.handleStateChanges(replicasForDeletedTopic.toSeq,
NonExistentReplica)
topicsToBeDeleted -= topic
topicsWithDeletionStarted -= topic
- zkClient.deleteTopicZNode(topic)
- zkClient.deleteTopicConfigs(Seq(topic))
- zkClient.deleteTopicDeletions(Seq(topic))
+ zkClient.deleteTopicZNode(topic, controllerContext.epochZkVersion)
+ zkClient.deleteTopicConfigs(Seq(topic), controllerContext.epochZkVersion)
+ zkClient.deleteTopicDeletions(Seq(topic), controllerContext.epochZkVersion)
controllerContext.removeTopic(topic)
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 59581e7..2393daa 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -166,7 +166,7 @@ class ReplicaManager(val config: KafkaConfig,
}
/* epoch of the controller that last changed the leader */
- @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch
- 1
+ @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch
private val localBrokerId = config.brokerId
private val allPartitions = new Pool[TopicPartition, Partition](valueFactory
= Some(tp =>
new Partition(tp.topic, tp.partition, time, this)))
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index c807965..a12abb4 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,24 +21,27 @@ import java.util.Properties
import com.yammer.metrics.core.MetricName
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch}
import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
-import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls}
+import kafka.security.auth.SimpleAclAuthorizer.{NoAcls, VersionedAcls}
import kafka.security.auth.{Acl, Resource, ResourceType}
import kafka.server.ConfigType
import kafka.utils.Logging
import kafka.zookeeper._
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.resource.PatternType
+import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.security.token.delegation.{DelegationToken,
TokenInformation}
import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
+import org.apache.zookeeper.KeeperException.{BadVersionException, Code,
ConnectionLossException, NodeExistsException}
+import org.apache.zookeeper.OpResult.{ErrorResult, SetDataResult}
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Seq, mutable}
+import scala.collection.JavaConverters._
/**
* Provides higher level Kafka-specific operations on top of the pipelined
[[kafka.zookeeper.ZooKeeperClient]].
@@ -86,14 +89,75 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
}
/**
- * Registers a given broker in zookeeper as the controller.
+ * Registers a given broker in zookeeper as the controller and increments
controller epoch.
+ * @return the (updated controller epoch, epoch zkVersion) tuple
* @param controllerId the id of the broker that is to be registered as the
controller.
- * @param timestamp the timestamp of the controller election.
- * @throws KeeperException if an error is returned by ZooKeeper.
- */
- def registerController(controllerId: Int, timestamp: Long): Unit = {
- val path = ControllerZNode.path
- checkedEphemeralCreate(path, ControllerZNode.encode(controllerId,
timestamp))
+ * @throws ControllerMovedException if fail to create /controller or fail to
increment controller epoch.
+ */
+ def registerControllerAndIncrementControllerEpoch(controllerId: Int): (Int,
Int) = {
+ val timestamp = time.milliseconds()
+
+ // Read /controller_epoch to get the current controller epoch and
zkVersion,
+ // create /controller_epoch with initial value if not exists
+ val (curEpoch, curEpochZkVersion) = getControllerEpoch
+ .map(e => (e._1, e._2.getVersion))
+ .getOrElse(maybeCreateControllerEpochZNode())
+
+ // Create /controller and update /controller_epoch atomically
+ val newControllerEpoch = curEpoch + 1
+ val expectedControllerEpochZkVersion = curEpochZkVersion
+
+ debug(s"Try to create ${ControllerZNode.path} and increment controller
epoch to $newControllerEpoch with expected controller epoch zkVersion
$expectedControllerEpochZkVersion")
+
+ def checkControllerAndEpoch(): (Int, Int) = {
+ val curControllerId = getControllerId.getOrElse(throw new
ControllerMovedException(
+ s"The ephemeral node at ${ControllerZNode.path} went away while
checking whether the controller election succeeds. " +
+ s"Aborting controller startup procedure"))
+ if (controllerId == curControllerId) {
+ val (epoch, stat) = getControllerEpoch.getOrElse(
+ throw new IllegalStateException(s"${ControllerEpochZNode.path}
existed before but goes away while trying to read it"))
+
+ // If the epoch is the same as newControllerEpoch, it is safe to infer
that the returned epoch zkVersion
+ // is associated with the current broker during controller election
because we already knew that the zk
+ // transaction succeeds based on the controller znode verification.
Other rounds of controller
+ // election will result in larger epoch number written in zk.
+ if (epoch == newControllerEpoch)
+ return (newControllerEpoch, stat.getVersion)
+ }
+ throw new ControllerMovedException("Controller moved to another broker.
Aborting controller startup procedure")
+ }
+
+ def tryCreateControllerZNodeAndIncrementEpoch(): (Int, Int) = {
+ try {
+ val transaction = zooKeeperClient.createTransaction()
+ transaction.create(ControllerZNode.path,
ControllerZNode.encode(controllerId, timestamp),
+ acls(ControllerZNode.path).asJava, CreateMode.EPHEMERAL)
+ transaction.setData(ControllerEpochZNode.path,
ControllerEpochZNode.encode(newControllerEpoch),
expectedControllerEpochZkVersion)
+ val results = transaction.commit()
+ val setDataResult = results.get(1).asInstanceOf[SetDataResult]
+ (newControllerEpoch, setDataResult.getStat.getVersion)
+ } catch {
+ case _: NodeExistsException | _: BadVersionException =>
checkControllerAndEpoch()
+ case _: ConnectionLossException =>
+ zooKeeperClient.waitUntilConnected()
+ tryCreateControllerZNodeAndIncrementEpoch()
+ }
+ }
+
+ tryCreateControllerZNodeAndIncrementEpoch()
+ }
+
+ private def maybeCreateControllerEpochZNode(): (Int, Int) = {
+
createControllerEpochRaw(KafkaController.InitialControllerEpoch).resultCode
match {
+ case Code.OK =>
+ info(s"Successfully created ${ControllerEpochZNode.path} with initial
epoch ${KafkaController.InitialControllerEpoch}")
+ (KafkaController.InitialControllerEpoch,
KafkaController.InitialControllerEpochZkVersion)
+ case Code.NODEEXISTS =>
+ val (epoch, stat) = getControllerEpoch.getOrElse(throw new
IllegalStateException(s"${ControllerEpochZNode.path} existed before but goes
away while trying to read it"))
+ (epoch, stat.getVersion)
+ case code =>
+ throw KeeperException.create(code)
+ }
}
def updateBrokerInfo(brokerInfo: BrokerInfo): Unit = {
@@ -119,13 +183,15 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
/**
* Sets topic partition states for the given partitions.
* @param leaderIsrAndControllerEpochs the partition states of each
partition whose state we wish to set.
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
* @return sequence of SetDataResponse whose contexts are the partitions
they are associated with.
*/
- def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs:
Map[TopicPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = {
+ def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs:
Map[TopicPartition, LeaderIsrAndControllerEpoch],
expectedControllerEpochZkVersion: Int): Seq[SetDataResponse] = {
val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition,
leaderIsrAndControllerEpoch) =>
val path = TopicPartitionStateZNode.path(partition)
val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
- SetDataRequest(path, data,
leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, Some(partition))
+ SetDataRequest(path, data,
leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, Some(partition),
+ controllerZkVersionCheck(expectedControllerEpochZkVersion))
}
retryRequestsUntilConnected(setDataRequests.toSeq)
}
@@ -133,15 +199,16 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
/**
* Creates topic partition state znodes for the given partitions.
* @param leaderIsrAndControllerEpochs the partition states of each
partition whose state we wish to set.
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
* @return sequence of CreateResponse whose contexts are the partitions they
are associated with.
*/
- def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs:
Map[TopicPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = {
-
createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq)
- createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq)
+ def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs:
Map[TopicPartition, LeaderIsrAndControllerEpoch],
expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = {
+
createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq,
expectedControllerEpochZkVersion)
+ createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq,
expectedControllerEpochZkVersion)
val createRequests = leaderIsrAndControllerEpochs.map { case (partition,
leaderIsrAndControllerEpoch) =>
val path = TopicPartitionStateZNode.path(partition)
val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
- CreateRequest(path, data, acls(path), CreateMode.PERSISTENT,
Some(partition))
+ CreateRequest(path, data, acls(path), CreateMode.PERSISTENT,
Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
}
retryRequestsUntilConnected(createRequests.toSeq)
}
@@ -172,9 +239,10 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
* Update the partition states of multiple partitions in zookeeper.
* @param leaderAndIsrs The partition states to update.
* @param controllerEpoch The current controller epoch.
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
* @return UpdateLeaderAndIsrResult instance containing per partition
results.
*/
- def updateLeaderAndIsr(leaderAndIsrs: Map[TopicPartition, LeaderAndIsr],
controllerEpoch: Int): UpdateLeaderAndIsrResult = {
+ def updateLeaderAndIsr(leaderAndIsrs: Map[TopicPartition, LeaderAndIsr],
controllerEpoch: Int, expectedControllerEpochZkVersion: Int):
UpdateLeaderAndIsrResult = {
val successfulUpdates = mutable.Map.empty[TopicPartition, LeaderAndIsr]
val updatesToRetry = mutable.Buffer.empty[TopicPartition]
val failed = mutable.Map.empty[TopicPartition, Exception]
@@ -182,8 +250,9 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
}
val setDataResponses = try {
- setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
+ setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs,
expectedControllerEpochZkVersion)
} catch {
+ case e: ControllerMovedException => throw e
case e: Exception =>
leaderAndIsrs.keys.foreach(partition => failed.put(partition, e))
return UpdateLeaderAndIsrResult(successfulUpdates.toMap,
updatesToRetry, failed.toMap)
@@ -381,10 +450,12 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
* Sets the topic znode with the given assignment.
* @param topic the topic whose assignment is being set.
* @param assignment the partition to replica mapping to set for the given
topic
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
* @return SetDataResponse
*/
- def setTopicAssignmentRaw(topic: String, assignment:
collection.Map[TopicPartition, Seq[Int]]): SetDataResponse = {
- val setDataRequest = SetDataRequest(TopicZNode.path(topic),
TopicZNode.encode(assignment), ZkVersion.MatchAnyVersion)
+ def setTopicAssignmentRaw(topic: String, assignment:
collection.Map[TopicPartition, Seq[Int]], expectedControllerEpochZkVersion:
Int): SetDataResponse = {
+ val setDataRequest = SetDataRequest(TopicZNode.path(topic),
TopicZNode.encode(assignment), ZkVersion.MatchAnyVersion,
+ zkVersionCheck =
controllerZkVersionCheck(expectedControllerEpochZkVersion))
retryRequestUntilConnected(setDataRequest)
}
@@ -392,10 +463,11 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
* Sets the topic znode with the given assignment.
* @param topic the topic whose assignment is being set.
* @param assignment the partition to replica mapping to set for the given
topic
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
* @throws KeeperException if there is an error while setting assignment
*/
- def setTopicAssignment(topic: String, assignment: Map[TopicPartition,
Seq[Int]]) = {
- val setDataResponse = setTopicAssignmentRaw(topic, assignment)
+ def setTopicAssignment(topic: String, assignment: Map[TopicPartition,
Seq[Int]], expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion) =
{
+ val setDataResponse = setTopicAssignmentRaw(topic, assignment,
expectedControllerEpochZkVersion)
setDataResponse.maybeThrow
}
@@ -443,11 +515,12 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
/**
* Deletes all log dir event notifications.
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
*/
- def deleteLogDirEventNotifications(): Unit = {
+ def deleteLogDirEventNotifications(expectedControllerEpochZkVersion: Int):
Unit = {
val getChildrenResponse =
retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
if (getChildrenResponse.resultCode == Code.OK) {
-
deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber))
+
deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber),
expectedControllerEpochZkVersion)
} else if (getChildrenResponse.resultCode != Code.NONODE) {
getChildrenResponse.maybeThrow
}
@@ -456,10 +529,12 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
/**
* Deletes the log dir event notifications associated with the given
sequence numbers.
* @param sequenceNumbers the sequence numbers associated with the log dir
event notifications to be deleted.
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
*/
- def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = {
+ def deleteLogDirEventNotifications(sequenceNumbers: Seq[String],
expectedControllerEpochZkVersion: Int): Unit = {
val deleteRequests = sequenceNumbers.map { sequenceNumber =>
- DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber),
ZkVersion.MatchAnyVersion)
+ DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber),
ZkVersion.MatchAnyVersion,
+ zkVersionCheck =
controllerZkVersionCheck(expectedControllerEpochZkVersion))
}
retryRequestsUntilConnected(deleteRequests)
}
@@ -677,9 +752,11 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
/**
* Remove the given topics from the topics marked for deletion.
* @param topics the topics to remove.
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
*/
- def deleteTopicDeletions(topics: Seq[String]): Unit = {
- val deleteRequests = topics.map(topic =>
DeleteRequest(DeleteTopicsTopicZNode.path(topic), ZkVersion.MatchAnyVersion))
+ def deleteTopicDeletions(topics: Seq[String],
expectedControllerEpochZkVersion: Int): Unit = {
+ val deleteRequests = topics.map(topic =>
DeleteRequest(DeleteTopicsTopicZNode.path(topic), ZkVersion.MatchAnyVersion,
+ zkVersionCheck =
controllerZkVersionCheck(expectedControllerEpochZkVersion)))
retryRequestsUntilConnected(deleteRequests)
}
@@ -708,18 +785,20 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
* exists or not.
*
* @param reassignment the reassignment to set on the reassignment znode
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
* @throws KeeperException if there is an error while setting or creating
the znode
*/
- def setOrCreatePartitionReassignment(reassignment:
collection.Map[TopicPartition, Seq[Int]]): Unit = {
+ def setOrCreatePartitionReassignment(reassignment:
collection.Map[TopicPartition, Seq[Int]], expectedControllerEpochZkVersion:
Int): Unit = {
def set(reassignmentData: Array[Byte]): SetDataResponse = {
- val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path,
reassignmentData, ZkVersion.MatchAnyVersion)
+ val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path,
reassignmentData, ZkVersion.MatchAnyVersion,
+ zkVersionCheck =
controllerZkVersionCheck(expectedControllerEpochZkVersion))
retryRequestUntilConnected(setDataRequest)
}
def create(reassignmentData: Array[Byte]): CreateResponse = {
val createRequest = CreateRequest(ReassignPartitionsZNode.path,
reassignmentData, acls(ReassignPartitionsZNode.path),
- CreateMode.PERSISTENT)
+ CreateMode.PERSISTENT, zkVersionCheck =
controllerZkVersionCheck(expectedControllerEpochZkVersion))
retryRequestUntilConnected(createRequest)
}
@@ -744,9 +823,11 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
/**
* Deletes the partition reassignment znode.
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
*/
- def deletePartitionReassignment(): Unit = {
- val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path,
ZkVersion.MatchAnyVersion)
+ def deletePartitionReassignment(expectedControllerEpochZkVersion: Int): Unit
= {
+ val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path,
ZkVersion.MatchAnyVersion,
+ zkVersionCheck =
controllerZkVersionCheck(expectedControllerEpochZkVersion))
retryRequestUntilConnected(deleteRequest)
}
@@ -851,11 +932,12 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
/**
* Deletes all isr change notifications.
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
*/
- def deleteIsrChangeNotifications(): Unit = {
+ def deleteIsrChangeNotifications(expectedControllerEpochZkVersion: Int):
Unit = {
val getChildrenResponse =
retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path))
if (getChildrenResponse.resultCode == Code.OK) {
-
deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber))
+
deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber),
expectedControllerEpochZkVersion)
} else if (getChildrenResponse.resultCode != Code.NONODE) {
getChildrenResponse.maybeThrow
}
@@ -864,10 +946,12 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
/**
* Deletes the isr change notifications associated with the given sequence
numbers.
* @param sequenceNumbers the sequence numbers associated with the isr
change notifications to be deleted.
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
*/
- def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = {
+ def deleteIsrChangeNotifications(sequenceNumbers: Seq[String],
expectedControllerEpochZkVersion: Int): Unit = {
val deleteRequests = sequenceNumbers.map { sequenceNumber =>
- DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber),
ZkVersion.MatchAnyVersion)
+ DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber),
ZkVersion.MatchAnyVersion,
+ zkVersionCheck =
controllerZkVersionCheck(expectedControllerEpochZkVersion))
}
retryRequestsUntilConnected(deleteRequests)
}
@@ -897,9 +981,11 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
/**
* Deletes the preferred replica election znode.
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
*/
- def deletePreferredReplicaElection(): Unit = {
- val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path,
ZkVersion.MatchAnyVersion)
+ def deletePreferredReplicaElection(expectedControllerEpochZkVersion: Int):
Unit = {
+ val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path,
ZkVersion.MatchAnyVersion,
+ zkVersionCheck =
controllerZkVersionCheck(expectedControllerEpochZkVersion))
retryRequestUntilConnected(deleteRequest)
}
@@ -919,9 +1005,11 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
/**
* Deletes the controller znode.
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
*/
- def deleteController(): Unit = {
- val deleteRequest = DeleteRequest(ControllerZNode.path,
ZkVersion.MatchAnyVersion)
+ def deleteController(expectedControllerEpochZkVersion: Int): Unit = {
+ val deleteRequest = DeleteRequest(ControllerZNode.path,
ZkVersion.MatchAnyVersion,
+ zkVersionCheck =
controllerZkVersionCheck(expectedControllerEpochZkVersion))
retryRequestUntilConnected(deleteRequest)
}
@@ -944,17 +1032,20 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
/**
* Recursively deletes the topic znode.
* @param topic the topic whose topic znode we wish to delete.
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
*/
- def deleteTopicZNode(topic: String): Unit = {
- deleteRecursive(TopicZNode.path(topic))
+ def deleteTopicZNode(topic: String, expectedControllerEpochZkVersion: Int):
Unit = {
+ deleteRecursive(TopicZNode.path(topic), expectedControllerEpochZkVersion)
}
/**
* Deletes the topic configs for the given topics.
* @param topics the topics whose configs we wish to delete.
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
*/
- def deleteTopicConfigs(topics: Seq[String]): Unit = {
- val deleteRequests = topics.map(topic =>
DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic),
ZkVersion.MatchAnyVersion))
+ def deleteTopicConfigs(topics: Seq[String],
expectedControllerEpochZkVersion: Int): Unit = {
+ val deleteRequests = topics.map(topic =>
DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic),
+ ZkVersion.MatchAnyVersion, zkVersionCheck =
controllerZkVersionCheck(expectedControllerEpochZkVersion)))
retryRequestsUntilConnected(deleteRequests)
}
@@ -1403,18 +1494,19 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
/**
* Deletes the given zk path recursively
* @param path
+ * @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
* @return true if path gets deleted successfully, false if root path
doesn't exist
* @throws KeeperException if there is an error while deleting the znodes
*/
- def deleteRecursive(path: String): Boolean = {
+ def deleteRecursive(path: String, expectedControllerEpochZkVersion: Int =
ZkVersion.MatchAnyVersion): Boolean = {
val getChildrenResponse =
retryRequestUntilConnected(GetChildrenRequest(path))
getChildrenResponse.resultCode match {
case Code.OK =>
- getChildrenResponse.children.foreach(child =>
deleteRecursive(s"$path/$child"))
- val deleteResponse = retryRequestUntilConnected(DeleteRequest(path,
ZkVersion.MatchAnyVersion))
- if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode
!= Code.NONODE) {
+ getChildrenResponse.children.foreach(child =>
deleteRecursive(s"$path/$child", expectedControllerEpochZkVersion))
+ val deleteResponse = retryRequestUntilConnected(DeleteRequest(path,
ZkVersion.MatchAnyVersion,
+ zkVersionCheck =
controllerZkVersionCheck(expectedControllerEpochZkVersion)))
+ if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode
!= Code.NONODE)
throw deleteResponse.resultException.get
- }
true
case Code.NONODE => false
case _ => throw getChildrenResponse.resultException.get
@@ -1468,18 +1560,18 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
}
- private def createTopicPartition(partitions: Seq[TopicPartition]):
Seq[CreateResponse] = {
+ private def createTopicPartition(partitions: Seq[TopicPartition],
expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = {
val createRequests = partitions.map { partition =>
val path = TopicPartitionZNode.path(partition)
- CreateRequest(path, null, acls(path), CreateMode.PERSISTENT,
Some(partition))
+ CreateRequest(path, null, acls(path), CreateMode.PERSISTENT,
Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
}
retryRequestsUntilConnected(createRequests)
}
- private def createTopicPartitions(topics: Seq[String]): Seq[CreateResponse]
= {
+ private def createTopicPartitions(topics: Seq[String],
expectedControllerEpochZkVersion: Int):Seq[CreateResponse] = {
val createRequests = topics.map { topic =>
val path = TopicPartitionsZNode.path(topic)
- CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic))
+ CreateRequest(path, null, acls(path), CreateMode.PERSISTENT,
Some(topic), controllerZkVersionCheck(expectedControllerEpochZkVersion))
}
retryRequestsUntilConnected(createRequests)
}
@@ -1513,13 +1605,16 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
requestResponsePairs.foreach { case (request, response) =>
if (response.resultCode == Code.CONNECTIONLOSS)
remainingRequests += request
- else
+ else {
+ maybeThrowControllerMoveException(response)
responses += response
+ }
}
if (remainingRequests.nonEmpty)
zooKeeperClient.waitUntilConnected()
} else {
+ batchResponses.foreach(maybeThrowControllerMoveException)
remainingRequests.clear()
responses ++= batchResponses
}
@@ -1599,4 +1694,31 @@ object KafkaZkClient {
time, metricGroup, metricType)
new KafkaZkClient(zooKeeperClient, isSecure, time)
}
+
+
+ private def controllerZkVersionCheck(version: Int): Option[ZkVersionCheck] =
{
+ if (version < KafkaController.InitialControllerEpochZkVersion)
+ None
+ else
+ Some(ZkVersionCheck(ControllerEpochZNode.path, version))
+ }
+
+ private def maybeThrowControllerMoveException(response: AsyncResponse): Unit
= {
+ response.zkVersionCheckResult match {
+ case Some(zkVersionCheckResult) =>
+ val zkVersionCheck = zkVersionCheckResult.zkVersionCheck
+ if (zkVersionCheck.checkPath.equals(ControllerEpochZNode.path))
+ zkVersionCheckResult.opResult match {
+ case errorResult: ErrorResult =>
+ val errorCode = Code.get(errorResult.getErr)
+ if (errorCode == Code.BADVERSION)
+ // Throw ControllerMovedException when the zkVersionCheck is
performed on the controller epoch znode and the check fails
+ throw new ControllerMovedException(s"Controller epoch
zkVersion check fails. Expected zkVersion =
${zkVersionCheck.expectedZkVersion}")
+ else if (errorCode != Code.OK)
+ throw KeeperException.create(errorCode,
zkVersionCheck.checkPath)
+ case _ =>
+ }
+ case None =>
+ }
+ }
}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 97ec9a4..5930414 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -17,21 +17,23 @@
package kafka.zookeeper
+import java.util
import java.util.Locale
import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
-import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap,
CountDownLatch, Semaphore, TimeUnit}
+import java.util.concurrent._
import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
import kafka.utils.{KafkaScheduler, Logging}
import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback,
DataCallback, StatCallback, StringCallback, VoidCallback}
+import org.apache.zookeeper.AsyncCallback._
import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.OpResult.{CreateResult, SetDataResult}
import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
import org.apache.zookeeper.ZooKeeper.States
import org.apache.zookeeper.data.{ACL, Stat}
-import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent,
Watcher, ZooKeeper}
+import org.apache.zookeeper._
import scala.collection.JavaConverters._
import scala.collection.mutable.Set
@@ -156,6 +158,10 @@ class ZooKeeperClient(connectString: String,
responseQueue.asScala.toBuffer
}
}
+
+ def createTransaction(): Transaction = {
+ zooKeeper.transaction()
+ }
// Visibility to override for testing
private[zookeeper] def send[Req <: AsyncRequest](request:
Req)(processResponse: Req#Response => Unit): Unit = {
@@ -166,44 +172,76 @@ class ZooKeeperClient(connectString: String,
val sendTimeMs = time.hiResClockMs()
request match {
- case ExistsRequest(path, ctx) =>
+ case ExistsRequest(path, ctx, _) =>
zooKeeper.exists(path, shouldWatch(request), new StatCallback {
override def processResult(rc: Int, path: String, ctx: Any, stat:
Stat): Unit =
callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat,
responseMetadata(sendTimeMs)))
}, ctx.orNull)
- case GetDataRequest(path, ctx) =>
+ case GetDataRequest(path, ctx, _) =>
zooKeeper.getData(path, shouldWatch(request), new DataCallback {
override def processResult(rc: Int, path: String, ctx: Any, data:
Array[Byte], stat: Stat): Unit =
callback(GetDataResponse(Code.get(rc), path, Option(ctx), data,
stat, responseMetadata(sendTimeMs)))
}, ctx.orNull)
- case GetChildrenRequest(path, ctx) =>
+ case GetChildrenRequest(path, ctx, _) =>
zooKeeper.getChildren(path, shouldWatch(request), new
Children2Callback {
override def processResult(rc: Int, path: String, ctx: Any,
children: java.util.List[String], stat: Stat): Unit =
callback(GetChildrenResponse(Code.get(rc), path, Option(ctx),
Option(children).map(_.asScala).getOrElse(Seq.empty), stat,
responseMetadata(sendTimeMs)))
}, ctx.orNull)
- case CreateRequest(path, data, acl, createMode, ctx) =>
- zooKeeper.create(path, data, acl.asJava, createMode, new
StringCallback {
- override def processResult(rc: Int, path: String, ctx: Any, name:
String): Unit =
- callback(CreateResponse(Code.get(rc), path, Option(ctx), name,
responseMetadata(sendTimeMs)))
- }, ctx.orNull)
- case SetDataRequest(path, data, version, ctx) =>
- zooKeeper.setData(path, data, version, new StatCallback {
- override def processResult(rc: Int, path: String, ctx: Any, stat:
Stat): Unit =
- callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat,
responseMetadata(sendTimeMs)))
- }, ctx.orNull)
- case DeleteRequest(path, version, ctx) =>
- zooKeeper.delete(path, version, new VoidCallback {
- override def processResult(rc: Int, path: String, ctx: Any): Unit =
- callback(DeleteResponse(Code.get(rc), path, Option(ctx),
responseMetadata(sendTimeMs)))
- }, ctx.orNull)
- case GetAclRequest(path, ctx) =>
+ case CreateRequest(path, data, acl, createMode, ctx, zkVersionCheck) =>
+ if (zkVersionCheck.isEmpty)
+ zooKeeper.create(path, data, acl.asJava, createMode, new
StringCallback {
+ override def processResult(rc: Int, path: String, ctx: Any, name:
String): Unit =
+ callback(CreateResponse(Code.get(rc), path, Option(ctx), name,
responseMetadata(sendTimeMs)))
+ }, ctx.orNull)
+ else
+ zooKeeper.multi(Seq(zkVersionCheck.get.checkOp, Op.create(path,
data, acl.asJava, createMode)).asJava, new MultiCallback {
+ override def processResult(rc: Int, multiOpPath: String, ctx:
scala.Any, opResults: util.List[OpResult]): Unit = {
+ val (zkVersionCheckOpResult, requestOpResult) =
(opResults.get(0), opResults.get(1))
+ val name = requestOpResult match {
+ case c: CreateResult => c.getPath
+ case _ => null
+ }
+ callback(CreateResponse(Code.get(rc), path, Option(ctx), name,
responseMetadata(sendTimeMs),
+ Some(ZkVersionCheckResult(zkVersionCheck.get,
zkVersionCheckOpResult))))
+ }}, ctx.orNull)
+ case SetDataRequest(path, data, version, ctx, zkVersionCheck) =>
+ if (zkVersionCheck.isEmpty)
+ zooKeeper.setData(path, data, version, new StatCallback {
+ override def processResult(rc: Int, path: String, ctx: Any, stat:
Stat): Unit =
+ callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat,
responseMetadata(sendTimeMs)))
+ }, ctx.orNull)
+ else
+ zooKeeper.multi(Seq(zkVersionCheck.get.checkOp, Op.setData(path,
data, version)).asJava, new MultiCallback {
+ override def processResult(rc: Int, multiOpPath: String, ctx:
scala.Any, opResults: util.List[OpResult]): Unit = {
+ val (zkVersionCheckOpResult, requestOpResult) =
(opResults.get(0), opResults.get(1))
+ val stat = requestOpResult match {
+ case s: SetDataResult => s.getStat
+ case _ => null
+ }
+ callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat,
responseMetadata(sendTimeMs),
+ Some(ZkVersionCheckResult(zkVersionCheck.get,
zkVersionCheckOpResult))))
+ }}, ctx.orNull)
+ case DeleteRequest(path, version, ctx, zkVersionCheck) =>
+ if (zkVersionCheck.isEmpty)
+ zooKeeper.delete(path, version, new VoidCallback {
+ override def processResult(rc: Int, path: String, ctx: Any): Unit =
+ callback(DeleteResponse(Code.get(rc), path, Option(ctx),
responseMetadata(sendTimeMs)))
+ }, ctx.orNull)
+ else
+ zooKeeper.multi(Seq(zkVersionCheck.get.checkOp, Op.delete(path,
version)).asJava, new MultiCallback {
+ override def processResult(rc: Int, multiOpPath: String, ctx:
scala.Any, opResults: util.List[OpResult]): Unit = {
+ val (zkVersionCheckOpResult, _) = (opResults.get(0),
opResults.get(1))
+ callback(DeleteResponse(Code.get(rc), path, Option(ctx),
responseMetadata(sendTimeMs),
+ Some(ZkVersionCheckResult(zkVersionCheck.get,
zkVersionCheckOpResult))))
+ }}, ctx.orNull)
+ case GetAclRequest(path, ctx, _) =>
zooKeeper.getACL(path, null, new ACLCallback {
override def processResult(rc: Int, path: String, ctx: Any, acl:
java.util.List[ACL], stat: Stat): Unit = {
callback(GetAclResponse(Code.get(rc), path, Option(ctx),
Option(acl).map(_.asScala).getOrElse(Seq.empty),
stat, responseMetadata(sendTimeMs)))
- }}, ctx.orNull)
- case SetAclRequest(path, acl, version, ctx) =>
+ }}, ctx.orNull)
+ case SetAclRequest(path, acl, version, ctx, _) =>
zooKeeper.setACL(path, acl.asJava, version, new StatCallback {
override def processResult(rc: Int, path: String, ctx: Any, stat:
Stat): Unit =
callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat,
responseMetadata(sendTimeMs)))
@@ -329,7 +367,7 @@ class ZooKeeperClient(connectString: String,
private[kafka] def currentZooKeeper: ZooKeeper =
inReadLock(initializationLock) {
zooKeeper
}
-
+
private def reinitialize(): Unit = {
// Initialization callbacks are invoked outside of the lock to avoid
deadlock potential since their completion
// may require additional Zookeeper requests, which will block to acquire
the initialization lock
@@ -447,45 +485,54 @@ sealed trait AsyncRequest {
type Response <: AsyncResponse
def path: String
def ctx: Option[Any]
+ def zkVersionCheck: Option[ZkVersionCheck]
+}
+
+case class ZkVersionCheck(checkPath: String, expectedZkVersion: Int) {
+ def checkOp: Op = Op.check(checkPath, expectedZkVersion)
}
+case class ZkVersionCheckResult(zkVersionCheck: ZkVersionCheck, opResult:
OpResult)
+
case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL],
createMode: CreateMode,
- ctx: Option[Any] = None) extends AsyncRequest {
+ ctx: Option[Any] = None, zkVersionCheck:
Option[ZkVersionCheck] = None) extends AsyncRequest {
type Response = CreateResponse
}
-case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None)
extends AsyncRequest {
+case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None,
zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest {
type Response = DeleteResponse
}
-case class ExistsRequest(path: String, ctx: Option[Any] = None) extends
AsyncRequest {
+case class ExistsRequest(path: String, ctx: Option[Any] = None,
zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest {
type Response = ExistsResponse
}
-case class GetDataRequest(path: String, ctx: Option[Any] = None) extends
AsyncRequest {
+case class GetDataRequest(path: String, ctx: Option[Any] = None,
zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest {
type Response = GetDataResponse
}
-case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx:
Option[Any] = None) extends AsyncRequest {
+case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx:
Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends
AsyncRequest {
type Response = SetDataResponse
}
-case class GetAclRequest(path: String, ctx: Option[Any] = None) extends
AsyncRequest {
+case class GetAclRequest(path: String, ctx: Option[Any] = None,
zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest {
type Response = GetAclResponse
}
-case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx:
Option[Any] = None) extends AsyncRequest {
+case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx:
Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends
AsyncRequest {
type Response = SetAclResponse
}
-case class GetChildrenRequest(path: String, ctx: Option[Any] = None) extends
AsyncRequest {
+case class GetChildrenRequest(path: String, ctx: Option[Any] = None,
zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest {
type Response = GetChildrenResponse
}
+
sealed abstract class AsyncResponse {
def resultCode: Code
def path: String
def ctx: Option[Any]
+ def zkVersionCheckResult: Option[ZkVersionCheckResult]
/** Return None if the result code is OK and KeeperException otherwise. */
def resultException: Option[KeeperException] =
@@ -506,17 +553,22 @@ case class ResponseMetadata(sendTimeMs: Long,
receivedTimeMs: Long) {
def responseTimeMs: Long = receivedTimeMs - sendTimeMs
}
-case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any],
name: String, metadata: ResponseMetadata) extends AsyncResponse
-case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any],
metadata: ResponseMetadata) extends AsyncResponse
-case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any],
stat: Stat, metadata: ResponseMetadata) extends AsyncResponse
+case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any],
name: String,
+ metadata: ResponseMetadata, zkVersionCheckResult:
Option[ZkVersionCheckResult] = None) extends AsyncResponse
+case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any],
+ metadata: ResponseMetadata, zkVersionCheckResult:
Option[ZkVersionCheckResult] = None) extends AsyncResponse
+case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any],
stat: Stat,
+ metadata: ResponseMetadata, zkVersionCheckResult:
Option[ZkVersionCheckResult] = None) extends AsyncResponse
case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any],
data: Array[Byte], stat: Stat,
- metadata: ResponseMetadata) extends AsyncResponse
-case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any],
stat: Stat, metadata: ResponseMetadata) extends AsyncResponse
+ metadata: ResponseMetadata, zkVersionCheckResult:
Option[ZkVersionCheckResult] = None) extends AsyncResponse
+case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any],
stat: Stat,
+ metadata: ResponseMetadata, zkVersionCheckResult:
Option[ZkVersionCheckResult] = None) extends AsyncResponse
case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any],
acl: Seq[ACL], stat: Stat,
- metadata: ResponseMetadata) extends AsyncResponse
-case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any],
stat: Stat, metadata: ResponseMetadata) extends AsyncResponse
+ metadata: ResponseMetadata, zkVersionCheckResult:
Option[ZkVersionCheckResult] = None) extends AsyncResponse
+case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any],
stat: Stat,
+ metadata: ResponseMetadata, zkVersionCheckResult:
Option[ZkVersionCheckResult] = None) extends AsyncResponse
case class GetChildrenResponse(resultCode: Code, path: String, ctx:
Option[Any], children: Seq[String], stat: Stat,
- metadata: ResponseMetadata) extends
AsyncResponse
+ metadata: ResponseMetadata,
zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse
class ZooKeeperClientException(message: String) extends
RuntimeException(message)
class ZooKeeperClientExpiredException(message: String) extends
ZooKeeperClientException(message)
diff --git
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 4f40b27..b44c239 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -20,7 +20,7 @@ import kafka.common.AdminCommandFailedException
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.TestUtils._
import kafka.utils.{Logging, TestUtils}
-import kafka.zk.{ReassignPartitionsZNode, ZooKeeperTestHarness}
+import kafka.zk.{ReassignPartitionsZNode, ZkVersion, ZooKeeperTestHarness}
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{After, Before, Test}
import kafka.admin.ReplicationQuotaUtils._
@@ -613,7 +613,7 @@ class ReassignPartitionsClusterTest extends
ZooKeeperTestHarness with Logging {
)
// Set znode directly to avoid non-existent topic validation
- zkClient.setOrCreatePartitionReassignment(firstMove)
+ zkClient.setOrCreatePartitionReassignment(firstMove,
ZkVersion.MatchAnyVersion)
servers.foreach(_.startup())
waitForReassignmentToComplete()
diff --git
a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
index e5753e5..e0a753c 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
@@ -54,7 +54,7 @@ class ControllerEventManagerTest {
val controllerStats = new ControllerStats
val eventProcessedListenerCount = new AtomicInteger
controllerEventManager = new ControllerEventManager(0,
controllerStats.rateAndTimeMetrics,
- _ => eventProcessedListenerCount.incrementAndGet)
+ _ => eventProcessedListenerCount.incrementAndGet, () => ())
controllerEventManager.start()
val initialTimerCount = timer(metricName).count
diff --git
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 5e5d84f..dc4076a 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -17,23 +17,29 @@
package kafka.controller
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.Properties
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue}
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.Timer
import kafka.api.LeaderAndIsr
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.TestUtils
-import kafka.zk.{PreferredReplicaElectionZNode, ZooKeeperTestHarness}
+import kafka.zk._
import org.junit.{After, Before, Test}
import org.junit.Assert.{assertEquals, assertTrue}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.log4j.Level
+import kafka.utils.LogCaptureAppender
import scala.collection.JavaConverters._
import scala.util.Try
class ControllerIntegrationTest extends ZooKeeperTestHarness {
var servers = Seq.empty[KafkaServer]
+ val firstControllerEpoch = KafkaController.InitialControllerEpoch + 1
+ val firstControllerEpochZkVersion =
KafkaController.InitialControllerEpochZkVersion + 1
@Before
override def setUp() {
@@ -51,30 +57,30 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
def testEmptyCluster(): Unit = {
servers = makeServers(1)
TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed
to elect a controller")
- waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker
failed to set controller epoch")
+ waitUntilControllerEpoch(firstControllerEpoch, "broker failed to set
controller epoch")
}
@Test
def testControllerEpochPersistsWhenAllBrokersDown(): Unit = {
servers = makeServers(1)
TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed
to elect a controller")
- waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker
failed to set controller epoch")
+ waitUntilControllerEpoch(firstControllerEpoch, "broker failed to set
controller epoch")
servers.head.shutdown()
servers.head.awaitShutdown()
TestUtils.waitUntilTrue(() => !zkClient.getControllerId.isDefined, "failed
to kill controller")
- waitUntilControllerEpoch(KafkaController.InitialControllerEpoch,
"controller epoch was not persisted after broker failure")
+ waitUntilControllerEpoch(firstControllerEpoch, "controller epoch was not
persisted after broker failure")
}
@Test
def testControllerMoveIncrementsControllerEpoch(): Unit = {
servers = makeServers(1)
TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed
to elect a controller")
- waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker
failed to set controller epoch")
+ waitUntilControllerEpoch(firstControllerEpoch, "broker failed to set
controller epoch")
servers.head.shutdown()
servers.head.awaitShutdown()
servers.head.startup()
TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed
to elect a controller")
- waitUntilControllerEpoch(KafkaController.InitialControllerEpoch + 1,
"controller epoch was not incremented after controller move")
+ waitUntilControllerEpoch(firstControllerEpoch + 1, "controller epoch was
not incremented after controller move")
}
@Test
@@ -83,7 +89,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(0))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
- waitForPartitionState(tp, KafkaController.InitialControllerEpoch, 0,
LeaderAndIsr.initialLeaderEpoch,
+ waitForPartitionState(tp, firstControllerEpoch, 0,
LeaderAndIsr.initialLeaderEpoch,
"failed to get expected partition state upon topic creation")
}
@@ -97,7 +103,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness
{
val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers.take(1))
- waitForPartitionState(tp, KafkaController.InitialControllerEpoch,
controllerId, LeaderAndIsr.initialLeaderEpoch,
+ waitForPartitionState(tp, firstControllerEpoch, controllerId,
LeaderAndIsr.initialLeaderEpoch,
"failed to get expected partition state upon topic creation")
}
@@ -109,8 +115,8 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
val assignment = Map(tp0.partition -> Seq(0))
val expandedAssignment = Map(tp0 -> Seq(0), tp1 -> Seq(0))
TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment =
assignment, servers = servers)
- zkClient.setTopicAssignment(tp0.topic, expandedAssignment)
- waitForPartitionState(tp1, KafkaController.InitialControllerEpoch, 0,
LeaderAndIsr.initialLeaderEpoch,
+ zkClient.setTopicAssignment(tp0.topic, expandedAssignment,
firstControllerEpochZkVersion)
+ waitForPartitionState(tp1, firstControllerEpoch, 0,
LeaderAndIsr.initialLeaderEpoch,
"failed to get expected partition state upon topic partition expansion")
TestUtils.waitUntilMetadataIsPropagated(servers, tp1.topic, tp1.partition)
}
@@ -127,8 +133,8 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment =
assignment, servers = servers)
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
- zkClient.setTopicAssignment(tp0.topic, expandedAssignment)
- waitForPartitionState(tp1, KafkaController.InitialControllerEpoch,
controllerId, LeaderAndIsr.initialLeaderEpoch,
+ zkClient.setTopicAssignment(tp0.topic, expandedAssignment,
firstControllerEpochZkVersion)
+ waitForPartitionState(tp1, firstControllerEpoch, controllerId,
LeaderAndIsr.initialLeaderEpoch,
"failed to get expected partition state upon topic partition expansion")
TestUtils.waitUntilMetadataIsPropagated(Seq(servers(controllerId)),
tp1.topic, tp1.partition)
}
@@ -147,7 +153,7 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
val reassignment = Map(tp -> Seq(otherBrokerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
zkClient.createPartitionReassignment(reassignment)
- waitForPartitionState(tp, KafkaController.InitialControllerEpoch,
otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 3,
+ waitForPartitionState(tp, firstControllerEpoch, otherBrokerId,
LeaderAndIsr.initialLeaderEpoch + 3,
"failed to get expected partition state after partition reassignment")
TestUtils.waitUntilTrue(() =>
zkClient.getReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
"failed to get updated partition assignment on topic znode after
partition reassignment")
@@ -169,8 +175,9 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
- zkClient.setOrCreatePartitionReassignment(reassignment)
- waitForPartitionState(tp, KafkaController.InitialControllerEpoch,
controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+ val controller = getController()
+ zkClient.setOrCreatePartitionReassignment(reassignment,
controller.kafkaController.controllerContext.epochZkVersion)
+ waitForPartitionState(tp, firstControllerEpoch, controllerId,
LeaderAndIsr.initialLeaderEpoch + 1,
"failed to get expected partition state during partition reassignment
with offline replica")
TestUtils.waitUntilTrue(() => zkClient.reassignPartitionsInProgress(),
"partition reassignment path should remain while reassignment in
progress")
@@ -188,10 +195,10 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
zkClient.createPartitionReassignment(reassignment)
- waitForPartitionState(tp, KafkaController.InitialControllerEpoch,
controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+ waitForPartitionState(tp, firstControllerEpoch, controllerId,
LeaderAndIsr.initialLeaderEpoch + 1,
"failed to get expected partition state during partition reassignment
with offline replica")
servers(otherBrokerId).startup()
- waitForPartitionState(tp, KafkaController.InitialControllerEpoch,
otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 4,
+ waitForPartitionState(tp, firstControllerEpoch, otherBrokerId,
LeaderAndIsr.initialLeaderEpoch + 4,
"failed to get expected partition state after partition reassignment")
TestUtils.waitUntilTrue(() =>
zkClient.getReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
"failed to get updated partition assignment on topic znode after
partition reassignment")
@@ -235,7 +242,7 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
zkClient.createPreferredReplicaElection(Set(tp))
TestUtils.waitUntilTrue(() =>
!zkClient.pathExists(PreferredReplicaElectionZNode.path),
"failed to remove preferred replica leader election path after giving
up")
- waitForPartitionState(tp, KafkaController.InitialControllerEpoch,
controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+ waitForPartitionState(tp, firstControllerEpoch, controllerId,
LeaderAndIsr.initialLeaderEpoch + 1,
"failed to get expected partition state upon broker shutdown")
}
@@ -249,10 +256,10 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
- waitForPartitionState(tp, KafkaController.InitialControllerEpoch,
controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+ waitForPartitionState(tp, firstControllerEpoch, controllerId,
LeaderAndIsr.initialLeaderEpoch + 1,
"failed to get expected partition state upon broker shutdown")
servers(otherBrokerId).startup()
- waitForPartitionState(tp, KafkaController.InitialControllerEpoch,
otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 2,
+ waitForPartitionState(tp, firstControllerEpoch, otherBrokerId,
LeaderAndIsr.initialLeaderEpoch + 2,
"failed to get expected partition state upon broker startup")
}
@@ -264,14 +271,14 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBrokerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
- waitForPartitionState(tp, KafkaController.InitialControllerEpoch,
otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
+ waitForPartitionState(tp, firstControllerEpoch, otherBrokerId,
LeaderAndIsr.initialLeaderEpoch,
"failed to get expected partition state upon topic creation")
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
TestUtils.waitUntilTrue(() => {
val leaderIsrAndControllerEpochMap =
zkClient.getTopicPartitionStates(Seq(tp))
leaderIsrAndControllerEpochMap.contains(tp) &&
- isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp),
KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader,
LeaderAndIsr.initialLeaderEpoch + 1) &&
+ isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp),
firstControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch +
1) &&
leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr ==
List(otherBrokerId)
}, "failed to get expected partition state after entire isr went offline")
}
@@ -284,14 +291,14 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBrokerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
- waitForPartitionState(tp, KafkaController.InitialControllerEpoch,
otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
+ waitForPartitionState(tp, firstControllerEpoch, otherBrokerId,
LeaderAndIsr.initialLeaderEpoch,
"failed to get expected partition state upon topic creation")
servers(1).shutdown()
servers(1).awaitShutdown()
TestUtils.waitUntilTrue(() => {
val leaderIsrAndControllerEpochMap =
zkClient.getTopicPartitionStates(Seq(tp))
leaderIsrAndControllerEpochMap.contains(tp) &&
- isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp),
KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader,
LeaderAndIsr.initialLeaderEpoch + 1) &&
+ isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp),
firstControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch +
1) &&
leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr ==
List(otherBrokerId)
}, "failed to get expected partition state after entire isr went offline")
}
@@ -341,18 +348,105 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader
== 0))
}
+ @Test
+ def testControllerMoveOnTopicCreation(): Unit = {
+ servers = makeServers(1)
+ TestUtils.waitUntilControllerElected(zkClient)
+ val tp = new TopicPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(0))
+
+ testControllerMove(() => {
+ val adminZkClient = new AdminZkClient(zkClient)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(tp.topic,
assignment, new Properties())
+ })
+ }
+
+ @Test
+ def testControllerMoveOnTopicDeletion(): Unit = {
+ servers = makeServers(1)
+ TestUtils.waitUntilControllerElected(zkClient)
+ val tp = new TopicPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(0))
+ TestUtils.createTopic(zkClient, tp.topic(), assignment, servers)
+
+ testControllerMove(() => {
+ val adminZkClient = new AdminZkClient(zkClient)
+ adminZkClient.deleteTopic(tp.topic())
+ })
+ }
+
+ @Test
+ def testControllerMoveOnPreferredReplicaElection(): Unit = {
+ servers = makeServers(1)
+ val tp = new TopicPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(0))
+ TestUtils.createTopic(zkClient, tp.topic(), assignment, servers)
+
+ testControllerMove(() => zkClient.createPreferredReplicaElection(Set(tp)))
+ }
+
+ @Test
+ def testControllerMoveOnPartitionReassignment(): Unit = {
+ servers = makeServers(1)
+ TestUtils.waitUntilControllerElected(zkClient)
+ val tp = new TopicPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(0))
+ TestUtils.createTopic(zkClient, tp.topic(), assignment, servers)
+
+ val reassignment = Map(tp -> Seq(0))
+ testControllerMove(() =>
zkClient.createPartitionReassignment(reassignment))
+ }
+
+ private def testControllerMove(fun: () => Unit): Unit = {
+ val controller = getController().kafkaController
+ val appender = LogCaptureAppender.createAndRegister()
+ val previousLevel =
LogCaptureAppender.setClassLoggerLevel(controller.eventManager.thread.getClass,
Level.INFO)
+
+ try {
+ TestUtils.waitUntilTrue(() => {
+ controller.eventManager.state == ControllerState.Idle
+ }, "Controller event thread is still busy")
+
+ val latch = new CountDownLatch(1)
+
+ // Let the controller event thread await on a latch before the
pre-defined logic is triggered.
+ // This is used to make sure that when the event thread resumes and
starts processing events, the controller has already moved.
+ controller.eventManager.put(KafkaController.AwaitOnLatch(latch))
+ // Execute pre-defined logic. This can be topic creation/deletion,
preferred leader election, etc.
+ fun()
+
+ // Delete the controller path, re-create /controller znode to emulate
controller movement
+ zkClient.deleteController(controller.controllerContext.epochZkVersion)
+ zkClient.registerControllerAndIncrementControllerEpoch(servers.size)
+
+ // Resume the controller event thread. At this point, the controller
should see mismatch controller epoch zkVersion and resign
+ latch.countDown()
+ TestUtils.waitUntilTrue(() => !controller.isActive, "Controller fails to
resign")
+
+ // Expect to capture the ControllerMovedException in the log of
ControllerEventThread
+ val event = appender.getMessages.find(e => e.getLevel == Level.INFO
+ && e.getThrowableInformation != null
+ && e.getThrowableInformation.getThrowable.getClass.getName.equals(new
ControllerMovedException("").getClass.getName))
+ assertTrue(event.isDefined)
+
+ } finally {
+ LogCaptureAppender.unregister(appender)
+
LogCaptureAppender.setClassLoggerLevel(controller.eventManager.thread.getClass,
previousLevel)
+ }
+ }
+
private def preferredReplicaLeaderElection(controllerId: Int, otherBroker:
KafkaServer, tp: TopicPartition,
replicas: Set[Int], leaderEpoch:
Int): Unit = {
otherBroker.shutdown()
otherBroker.awaitShutdown()
- waitForPartitionState(tp, KafkaController.InitialControllerEpoch,
controllerId, leaderEpoch + 1,
+ waitForPartitionState(tp, firstControllerEpoch, controllerId, leaderEpoch
+ 1,
"failed to get expected partition state upon broker shutdown")
otherBroker.startup()
TestUtils.waitUntilTrue(() => zkClient.getInSyncReplicasForPartition(new
TopicPartition(tp.topic, tp.partition)).get.toSet == replicas, "restarted
broker failed to join in-sync replicas")
zkClient.createPreferredReplicaElection(Set(tp))
TestUtils.waitUntilTrue(() =>
!zkClient.pathExists(PreferredReplicaElectionZNode.path),
"failed to remove preferred replica leader election path after
completion")
- waitForPartitionState(tp, KafkaController.InitialControllerEpoch,
otherBroker.config.brokerId, leaderEpoch + 2,
+ waitForPartitionState(tp, firstControllerEpoch,
otherBroker.config.brokerId, leaderEpoch + 2,
"failed to get expected partition state upon broker startup")
}
@@ -395,4 +489,9 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
.getOrElse(fail(s"Unable to find metric
$metricName")).asInstanceOf[Timer]
}
+ private def getController(): KafkaServer = {
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+ servers.filter(s => s.config.brokerId == controllerId).head
+ }
+
}
diff --git
a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 6a587f3..b89632e 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -22,7 +22,7 @@ import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
-import kafka.zookeeper.{CreateResponse, GetDataResponse, ResponseMetadata,
ZooKeeperClientException}
+import kafka.zookeeper._
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.Stat
@@ -85,7 +85,7 @@ class PartitionStateMachineTest extends JUnitSuite {
partitionState.put(partition, NewPartition)
val leaderIsrAndControllerEpoch =
LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)),
controllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
- EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition
-> leaderIsrAndControllerEpoch)))
+ EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition
-> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion))
.andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null,
ResponseMetadata(0, 0))))
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true))
@@ -103,7 +103,7 @@ class PartitionStateMachineTest extends JUnitSuite {
partitionState.put(partition, NewPartition)
val leaderIsrAndControllerEpoch =
LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)),
controllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
- EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition
-> leaderIsrAndControllerEpoch)))
+ EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition
-> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion))
.andThrow(new ZooKeeperClientException("test"))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
@@ -119,7 +119,7 @@ class PartitionStateMachineTest extends JUnitSuite {
partitionState.put(partition, NewPartition)
val leaderIsrAndControllerEpoch =
LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)),
controllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
- EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition
-> leaderIsrAndControllerEpoch)))
+ EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition
-> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion))
.andReturn(Seq(CreateResponse(Code.NODEEXISTS, null, Some(partition),
null, ResponseMetadata(0, 0))))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
@@ -159,7 +159,7 @@ class PartitionStateMachineTest extends JUnitSuite {
val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
- EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition ->
leaderAndIsrAfterElection), controllerEpoch))
+ EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition ->
leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
.andReturn(UpdateLeaderAndIsrResult(Map(partition ->
updatedLeaderAndIsr), Seq.empty, Map.empty))
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr,
controllerEpoch), Seq(brokerId), isNew = false))
@@ -190,7 +190,7 @@ class PartitionStateMachineTest extends JUnitSuite {
val leaderAndIsrAfterElection =
leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId))
val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
- EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition ->
leaderAndIsrAfterElection), controllerEpoch))
+ EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition ->
leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
.andReturn(UpdateLeaderAndIsrResult(Map(partition ->
updatedLeaderAndIsr), Seq.empty, Map.empty))
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr,
controllerEpoch), Seq(brokerId, otherBrokerId),
@@ -243,7 +243,7 @@ class PartitionStateMachineTest extends JUnitSuite {
.andReturn((Map(partition.topic -> LogConfig()), Map.empty))
val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
- EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition ->
leaderAndIsrAfterElection), controllerEpoch))
+ EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition ->
leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
.andReturn(UpdateLeaderAndIsrResult(Map(partition ->
updatedLeaderAndIsr), Seq.empty, Map.empty))
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr,
controllerEpoch), Seq(brokerId), isNew = false))
@@ -336,7 +336,7 @@ class PartitionStateMachineTest extends JUnitSuite {
val updatedLeaderAndIsr = partitions.map { partition =>
partition -> leaderAndIsr.newLeaderAndIsr(brokerId, List(brokerId))
}.toMap
- EasyMock.expect(mockZkClient.updateLeaderAndIsr(updatedLeaderAndIsr,
controllerEpoch))
+ EasyMock.expect(mockZkClient.updateLeaderAndIsr(updatedLeaderAndIsr,
controllerEpoch, controllerContext.epochZkVersion))
.andReturn(UpdateLeaderAndIsrResult(updatedLeaderAndIsr, Seq.empty,
Map.empty))
}
prepareMockToUpdateLeaderAndIsr()
diff --git
a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index c573c9f..ef274fa 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -183,7 +183,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)).andReturn(
Seq(GetDataResponse(Code.OK, null, Some(partition),
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat,
ResponseMetadata(0, 0))))
- EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition ->
adjustedLeaderAndIsr), controllerEpoch))
+ EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition ->
adjustedLeaderAndIsr), controllerEpoch, controllerContext.epochZkVersion))
.andReturn(UpdateLeaderAndIsrResult(Map(partition ->
updatedLeaderAndIsr), Seq.empty, Map.empty))
EasyMock.expect(mockTopicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)).andReturn(false)
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
diff --git a/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
new file mode 100644
index 0000000..80472e9
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import org.apache.log4j.{AppenderSkeleton, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
+
+import scala.collection.mutable.ListBuffer
+
+class LogCaptureAppender extends AppenderSkeleton {
+ private val events: ListBuffer[LoggingEvent] = ListBuffer.empty
+
+ override protected def append(event: LoggingEvent): Unit = {
+ events.synchronized {
+ events += event
+ }
+ }
+
+ def getMessages: ListBuffer[LoggingEvent] = {
+ events.synchronized {
+ return events.clone()
+ }
+ }
+
+ override def close(): Unit = {
+ events.synchronized {
+ events.clear()
+ }
+ }
+
+ override def requiresLayout: Boolean = false
+}
+
+object LogCaptureAppender {
+ def createAndRegister(): LogCaptureAppender = {
+ val logCaptureAppender: LogCaptureAppender = new LogCaptureAppender
+ Logger.getRootLogger.addAppender(logCaptureAppender)
+ logCaptureAppender
+ }
+
+ def setClassLoggerLevel(clazz: Class[_], logLevel: Level): Level = {
+ val logger = Logger.getLogger(clazz)
+ val previousLevel = logger.getLevel
+ Logger.getLogger(clazz).setLevel(logLevel)
+ previousLevel
+ }
+
+ def unregister(logCaptureAppender: LogCaptureAppender): Unit = {
+ Logger.getRootLogger.removeAppender(logCaptureAppender)
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 3389161..65273eb 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -20,7 +20,7 @@ package kafka.utils
import kafka.server.{KafkaConfig, ReplicaFetcherManager}
import kafka.api.LeaderAndIsr
import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.zk.{IsrChangeNotificationZNode, TopicZNode, ZooKeeperTestHarness}
+import kafka.zk._
import org.apache.kafka.common.TopicPartition
import org.junit.Assert._
import org.junit.{Before, Test}
@@ -42,7 +42,7 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
val topicPartition = new TopicPartition(topic, partition)
val leaderAndIsr = LeaderAndIsr(leader, leaderEpoch, isr, 1)
val leaderIsrAndControllerEpoch =
LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
- zkClient.createTopicPartitionStatesRaw(Map(topicPartition ->
leaderIsrAndControllerEpoch))
+ zkClient.createTopicPartitionStatesRaw(Map(topicPartition ->
leaderIsrAndControllerEpoch), ZkVersion.MatchAnyVersion)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 48d7d3f..d42d02c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -36,7 +36,7 @@ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile
import Implicits._
import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.zk.{AdminZkClient, BrokerIdsZNode, BrokerInfo, KafkaZkClient}
+import kafka.zk._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult,
Config, ConfigEntry}
import org.apache.kafka.clients.consumer._
@@ -635,7 +635,7 @@ object TestUtils extends Logging {
.getOrElse(LeaderAndIsr(leader, List(leader)))
topicPartition -> LeaderIsrAndControllerEpoch(newLeaderAndIsr,
controllerEpoch)
}
- zkClient.setTopicPartitionStatesRaw(newLeaderIsrAndControllerEpochs)
+ zkClient.setTopicPartitionStatesRaw(newLeaderIsrAndControllerEpochs,
ZkVersion.MatchAnyVersion)
}
/**
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9cffb51..61ca3bb 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -42,6 +42,7 @@ import scala.util.Random
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zookeeper._
+import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.data.Stat
@@ -55,12 +56,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val topicPartition11 = new TopicPartition(topic1, 1)
val topicPartition20 = new TopicPartition(topic2, 0)
val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+ val controllerEpochZkVersion = 0
var otherZkClient: KafkaZkClient = _
@Before
override def setUp(): Unit = {
super.setUp()
+ zkClient.createControllerEpochRaw(1)
otherZkClient = KafkaZkClient(zkConnect,
zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
}
@@ -69,6 +72,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
override def tearDown(): Unit = {
if (otherZkClient != null)
otherZkClient.close()
+ zkClient.deletePath(ControllerEpochZNode.path)
super.tearDown()
}
@@ -99,16 +103,29 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.createRecursive("/delete/some/random/path")
assertTrue(zkClient.pathExists("/delete/some/random/path"))
- zkClient.deleteRecursive("/delete")
- assertFalse(zkClient.pathExists("/delete/some/random/path"))
- assertFalse(zkClient.pathExists("/delete/some/random"))
- assertFalse(zkClient.pathExists("/delete/some"))
+ assertTrue(zkClient.deleteRecursive("/delete"))
assertFalse(zkClient.pathExists("/delete"))
intercept[IllegalArgumentException](zkClient.deleteRecursive("delete-invalid-path"))
}
@Test
+ def testDeleteRecursiveWithControllerEpochVersionCheck(): Unit = {
+ assertFalse(zkClient.deleteRecursive("/delete/does-not-exist",
controllerEpochZkVersion))
+
+ zkClient.createRecursive("/delete/some/random/path")
+ assertTrue(zkClient.pathExists("/delete/some/random/path"))
+ intercept[ControllerMovedException](
+ zkClient.deleteRecursive("/delete", controllerEpochZkVersion + 1))
+
+ assertTrue(zkClient.deleteRecursive("/delete", controllerEpochZkVersion))
+ assertFalse(zkClient.pathExists("/delete"))
+
+ intercept[IllegalArgumentException](zkClient.deleteRecursive(
+ "delete-invalid-path", controllerEpochZkVersion))
+ }
+
+ @Test
def testCreateRecursive() {
zkClient.createRecursive("/create-newrootpath")
assertTrue(zkClient.pathExists("/create-newrootpath"))
@@ -268,7 +285,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testIsrChangeNotificationsDeletion(): Unit = {
// Should not fail even if parent node does not exist
- zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
+ zkClient.deleteIsrChangeNotifications(Seq("0000000000"),
controllerEpochZkVersion)
zkClient.createRecursive("/isr_change_notification")
@@ -276,13 +293,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.propagateIsrChanges(Set(topicPartition10))
zkClient.propagateIsrChanges(Set(topicPartition11))
- zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+ // Should throw exception if the controllerEpochZkVersion does not match
+
intercept[ControllerMovedException](zkClient.deleteIsrChangeNotifications(Seq("0000000001"),
controllerEpochZkVersion + 1))
+ // Delete should not succeed
+ assertEquals(Set("0000000000", "0000000001", "0000000002"),
zkClient.getAllIsrChangeNotifications.toSet)
+
+ zkClient.deleteIsrChangeNotifications(Seq("0000000001"),
controllerEpochZkVersion)
// Should not fail if called on a non-existent notification
- zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+ zkClient.deleteIsrChangeNotifications(Seq("0000000001"),
controllerEpochZkVersion)
assertEquals(Set("0000000000", "0000000002"),
zkClient.getAllIsrChangeNotifications.toSet)
- zkClient.deleteIsrChangeNotifications()
- assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
+ zkClient.deleteIsrChangeNotifications(controllerEpochZkVersion)
+ assertEquals(Seq.empty, zkClient.getAllIsrChangeNotifications)
}
@Test
@@ -335,7 +357,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testLogDirEventNotificationsDeletion(): Unit = {
// Should not fail even if parent node does not exist
- zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+ zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"),
controllerEpochZkVersion)
zkClient.createRecursive("/log_dir_event_notification")
@@ -346,13 +368,16 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.propagateLogDirEvent(brokerId)
zkClient.propagateLogDirEvent(anotherBrokerId)
- zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
intercept[ControllerMovedException](zkClient.deleteLogDirEventNotifications(Seq("0000000000",
"0000000002"), controllerEpochZkVersion + 1))
+ assertEquals(Seq("0000000000", "0000000001", "0000000002"),
zkClient.getAllLogDirEventNotifications)
+
+ zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"),
controllerEpochZkVersion)
assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
zkClient.propagateLogDirEvent(anotherBrokerId)
- zkClient.deleteLogDirEventNotifications()
+ zkClient.deleteLogDirEventNotifications(controllerEpochZkVersion)
assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
}
@@ -368,14 +393,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
new TopicPartition("topic_b", 0) -> Seq(4, 5),
new TopicPartition("topic_c", 0) -> Seq(5, 3)
)
- zkClient.setOrCreatePartitionReassignment(reassignment)
+
+ // Should throw ControllerMovedException if the controller epoch zkVersion
does not match
+
intercept[ControllerMovedException](zkClient.setOrCreatePartitionReassignment(reassignment,
controllerEpochZkVersion + 1))
+
+ zkClient.setOrCreatePartitionReassignment(reassignment,
controllerEpochZkVersion)
assertEquals(reassignment, zkClient.getPartitionReassignment)
- val updatedReassingment = reassignment - new TopicPartition("topic_b", 0)
- zkClient.setOrCreatePartitionReassignment(updatedReassingment)
- assertEquals(updatedReassingment, zkClient.getPartitionReassignment)
+ val updatedReassignment = reassignment - new TopicPartition("topic_b", 0)
+ zkClient.setOrCreatePartitionReassignment(updatedReassignment,
controllerEpochZkVersion)
+ assertEquals(updatedReassignment, zkClient.getPartitionReassignment)
- zkClient.deletePartitionReassignment()
+ zkClient.deletePartitionReassignment(controllerEpochZkVersion)
assertEquals(Map.empty, zkClient.getPartitionReassignment)
zkClient.createPartitionReassignment(reassignment)
@@ -513,9 +542,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testDeleteTopicZNode(): Unit = {
- zkClient.deleteTopicZNode(topic1)
+ zkClient.deleteTopicZNode(topic1, controllerEpochZkVersion)
zkClient.createRecursive(TopicZNode.path(topic1))
- zkClient.deleteTopicZNode(topic1)
+ zkClient.deleteTopicZNode(topic1, controllerEpochZkVersion)
assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
}
@@ -530,7 +559,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.isTopicMarkedForDeletion(topic1))
assertEquals(Set(topic1, topic2), zkClient.getTopicDeletions.toSet)
- zkClient.deleteTopicDeletions(Seq(topic1, topic2))
+
intercept[ControllerMovedException](zkClient.deleteTopicDeletions(Seq(topic1,
topic2), controllerEpochZkVersion + 1))
+ assertEquals(Set(topic1, topic2), zkClient.getTopicDeletions.toSet)
+
+ zkClient.deleteTopicDeletions(Seq(topic1, topic2),
controllerEpochZkVersion)
assertTrue(zkClient.getTopicDeletions.isEmpty)
}
@@ -564,7 +596,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps)
assertEquals(Set(topic1, topic2),
zkClient.getAllEntitiesWithConfig(ConfigType.Topic).toSet)
- zkClient.deleteTopicConfigs(Seq(topic1, topic2))
+ zkClient.deleteTopicConfigs(Seq(topic1, topic2), controllerEpochZkVersion)
assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
}
@@ -742,22 +774,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
Map(
topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode =
NoNode for /brokers/topics/topic1/partitions/0/state"),
topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode =
NoNode for /brokers/topics/topic1/partitions/1/state")),
- zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
+ zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4,
controllerEpochZkVersion))
+
+
zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs,
controllerEpochZkVersion)
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+ // Mismatch controller epoch zkVersion
+
intercept[ControllerMovedException](zkClient.updateLeaderAndIsr(initialLeaderIsrs,
controllerEpoch = 4, controllerEpochZkVersion + 1))
+ // successful updates
checkUpdateLeaderAndIsrResult(
leaderIsrs(state = 1, zkVersion = 1),
mutable.ArrayBuffer.empty,
Map.empty,
- zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion =
0),controllerEpoch = 4))
+ zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion =
0),controllerEpoch = 4, controllerEpochZkVersion))
// Try to update with wrong ZK version
checkUpdateLeaderAndIsrResult(
Map.empty,
ArrayBuffer(topicPartition10, topicPartition11),
Map.empty,
- zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion =
0),controllerEpoch = 4))
+ zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion =
0),controllerEpoch = 4, controllerEpochZkVersion))
// Trigger successful, to be retried and failed partitions in same call
val mixedState = Map(
@@ -770,7 +806,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
ArrayBuffer(topicPartition11),
Map(
topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode =
NoNode for /brokers/topics/topic2/partitions/0/state")),
- zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
+ zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4,
controllerEpochZkVersion))
}
private def checkGetDataResponse(
@@ -786,9 +822,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
TopicPartitionStateZNode.decode(response.data,
statWithVersion(zkVersion)))
}
- private def eraseMetadata(response: CreateResponse): CreateResponse =
- response.copy(metadata = ResponseMetadata(0, 0))
-
+ private def eraseUncheckedInfoInCreateResponse(response: CreateResponse):
CreateResponse =
+ response.copy(metadata = ResponseMetadata(0, 0), zkVersionCheckResult =
None)
+
@Test
def testGetTopicsAndPartitions(): Unit = {
assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
@@ -800,7 +836,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getAllPartitions.isEmpty)
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs,
controllerEpochZkVersion)
assertEquals(Set(topicPartition10, topicPartition11),
zkClient.getAllPartitions)
}
@@ -808,14 +844,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
zkClient.createRecursive(TopicZNode.path(topic1))
+ // Mismatch controller epoch zkVersion
+
intercept[ControllerMovedException](zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs,
controllerEpochZkVersion + 1))
+
assertEquals(
Seq(
CreateResponse(Code.OK,
TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0,
0)),
CreateResponse(Code.OK,
TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0,
0))),
-
zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
- .map(eraseMetadata).toList)
+
zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs,
controllerEpochZkVersion)
+ .map(eraseUncheckedInfoInCreateResponse).toList)
val getResponses =
zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
assertEquals(2, getResponses.size)
@@ -824,11 +863,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// Trying to create existing topicPartition states fails
assertEquals(
Seq(
- CreateResponse(Code.NODEEXISTS,
TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
- null, ResponseMetadata(0, 0)),
- CreateResponse(Code.NODEEXISTS,
TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
- null, ResponseMetadata(0, 0))),
-
zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
+ CreateResponse(Code.NODEEXISTS,
TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10), null,
ResponseMetadata(0, 0)),
+ CreateResponse(Code.NODEEXISTS,
TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11), null,
ResponseMetadata(0, 0))),
+
zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs,
controllerEpochZkVersion).map(eraseUncheckedInfoInCreateResponse).toList)
}
@Test
@@ -837,7 +874,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode:
Code, stat: Stat) =
topicPartitions.map { topicPartition =>
SetDataResponse(resultCode,
TopicPartitionStateZNode.path(topicPartition),
- Some(topicPartition), stat, ResponseMetadata(0, 0))
+ Some(topicPartition), stat, ResponseMetadata(0, 0), None)
}
zkClient.createRecursive(TopicZNode.path(topic1))
@@ -845,16 +882,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// Trying to set non-existing topicPartition's data results in NONODE
responses
assertEquals(
expectedSetDataResponses(topicPartition10,
topicPartition11)(Code.NONODE, null),
-
zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
- _.copy(metadata = ResponseMetadata(0, 0))}.toList)
+ zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs,
controllerEpochZkVersion).map {
+ _.copy(metadata = ResponseMetadata(0, 0), zkVersionCheckResult =
None)}.toList)
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs,
controllerEpochZkVersion)
assertEquals(
expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK,
statWithVersion(1)),
- zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state =
1, zkVersion = 0)).map {
- eraseMetadataAndStat}.toList)
+ zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state =
1, zkVersion = 0), controllerEpochZkVersion).map {
+ eraseUncheckedInfoInSetDataResponse}.toList)
+ // Mismatch controller epoch zkVersion
+
intercept[ControllerMovedException](zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state
= 1, zkVersion = 0), controllerEpochZkVersion + 1))
val getResponses =
zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
assertEquals(2, getResponses.size)
@@ -863,8 +902,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// Other ZK client can also write the state of a partition
assertEquals(
expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK,
statWithVersion(2)),
-
otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state =
2, zkVersion = 1)).map {
- eraseMetadataAndStat}.toList)
+
otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state =
2, zkVersion = 1), controllerEpochZkVersion).map {
+ eraseUncheckedInfoInSetDataResponse}.toList)
}
@Test
@@ -881,7 +920,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.createRecursive(TopicZNode.path(topic1))
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs,
controllerEpochZkVersion)
assertEquals(
initialLeaderIsrAndControllerEpochs,
zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
@@ -906,36 +945,38 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
- private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse
= {
+ private def eraseUncheckedInfoInSetDataResponse(response: SetDataResponse):
SetDataResponse = {
val stat = if (response.stat != null)
statWithVersion(response.stat.getVersion) else null
- response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
+ response.copy(metadata = ResponseMetadata(0, 0), stat = stat,
zkVersionCheckResult = None)
}
@Test
def testControllerEpochMethods(): Unit = {
+ zkClient.deletePath(ControllerEpochZNode.path)
+
assertEquals(None, zkClient.getControllerEpoch)
assertEquals("Setting non existing nodes should return NONODE results",
SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null,
ResponseMetadata(0, 0)),
- eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+ eraseUncheckedInfoInSetDataResponse(zkClient.setControllerEpochRaw(1,
0)))
assertEquals("Creating non existing nodes is OK",
CreateResponse(Code.OK, ControllerEpochZNode.path, None,
ControllerEpochZNode.path, ResponseMetadata(0, 0)),
- eraseMetadata(zkClient.createControllerEpochRaw(0)))
+ eraseUncheckedInfoInCreateResponse(zkClient.createControllerEpochRaw(0)))
assertEquals(0, zkClient.getControllerEpoch.get._1)
assertEquals("Attemt to create existing nodes should return NODEEXISTS",
CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null,
ResponseMetadata(0, 0)),
- eraseMetadata(zkClient.createControllerEpochRaw(0)))
+ eraseUncheckedInfoInCreateResponse(zkClient.createControllerEpochRaw(0)))
assertEquals("Updating existing nodes is OK",
SetDataResponse(Code.OK, ControllerEpochZNode.path, None,
statWithVersion(1), ResponseMetadata(0, 0)),
- eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+ eraseUncheckedInfoInSetDataResponse(zkClient.setControllerEpochRaw(1,
0)))
assertEquals(1, zkClient.getControllerEpoch.get._1)
assertEquals("Updating with wrong ZK version returns BADVERSION",
SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null,
ResponseMetadata(0, 0)),
- eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+ eraseUncheckedInfoInSetDataResponse(zkClient.setControllerEpochRaw(1,
0)))
}
@Test
@@ -943,9 +984,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// No controller
assertEquals(None, zkClient.getControllerId)
// Create controller
- zkClient.registerController(controllerId = 1, timestamp = 123456)
+ val (_, newEpochZkVersion) =
zkClient.registerControllerAndIncrementControllerEpoch(controllerId = 1)
assertEquals(Some(1), zkClient.getControllerId)
- zkClient.deleteController()
+ zkClient.deleteController(newEpochZkVersion)
assertEquals(None, zkClient.getControllerId)
}
@@ -1002,7 +1043,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.createPreferredReplicaElection(electionPartitions)
}
- zkClient.deletePreferredReplicaElection()
+ // Mismatch controller epoch zkVersion
+
intercept[ControllerMovedException](zkClient.deletePreferredReplicaElection(controllerEpochZkVersion
+ 1))
+ assertEquals(electionPartitions, zkClient.getPreferredReplicaElection)
+
+ zkClient.deletePreferredReplicaElection(controllerEpochZkVersion)
assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
}