mumrah commented on code in PR #12973:
URL: https://github.com/apache/kafka/pull/12973#discussion_r1046093236
##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -166,60 +166,74 @@ class KafkaZkClient private[zk] (zooKeeperClient:
ZooKeeperClient, isSecure: Boo
* the migration.
*
* To ensure that the KRaft controller epoch exceeds the current ZK
controller epoch, this registration algorithm
- * uses a conditional update on the /controller_epoch znode. If a new ZK
controller is elected during this method,
- * the conditional update on /controller_epoch fails which causes the whole
multi-op transaction to fail.
+ * uses a conditional update on the /controller and /controller_epoch znodes.
+ *
+ * If a new controller is registered concurrently with this registration,
one of the two will fail the CAS
+ * operation on /controller_epoch. For KRaft, we have an extra guard against
the registered KRaft epoch going
+ * backwards. If a KRaft controller had previously registered, an additional
CAS operation is done on the /controller
+ * ZNode to ensure that the KRaft epoch being registered is newer.
*
* @param kraftControllerId ID of the KRaft controller node
* @param kraftControllerEpoch Epoch of the KRaft controller node
- * @return An optional of the new zkVersion of /controller_epoch. None if we
could not register the KRaft controller.
+ * @return An optional of the written epoch and new zkVersion of
/controller_epoch. None if we could not register the KRaft controller.
*/
- def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int,
kraftControllerEpoch: Int): Option[Int] = {
+ def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int,
kraftControllerEpoch: Int): Option[(Int, Int)] = {
val timestamp = time.milliseconds()
val curEpochOpt: Option[(Int, Int)] = getControllerEpoch.map(e => (e._1,
e._2.getVersion))
- val controllerOpt = getControllerId
- val controllerEpochToStore = kraftControllerEpoch + 10000000 // TODO
Remove this after KAFKA-14436
+ val controllerOpt = getControllerRegistration
+
+ // If we have a KRaft epoch registered in /controller, and it is not
_older_ than the requested epoch, throw an error.
+ controllerOpt.flatMap(_.kraftEpoch).foreach { kraftEpochInZk =>
+ if (kraftEpochInZk >= kraftControllerEpoch) {
+ throw new ControllerMovedException(s"Cannot register KRaft controller
$kraftControllerId with epoch $kraftControllerEpoch " +
+ s"as the current controller register in ZK has the same or newer
epoch $kraftEpochInZk.")
+ }
+ }
+
curEpochOpt match {
case None =>
throw new IllegalStateException(s"Cannot register KRaft controller
$kraftControllerId as the active controller " +
s"since there is no ZK controller epoch present.")
case Some((curEpoch: Int, curEpochZk: Int)) =>
- if (curEpoch >= controllerEpochToStore) {
- // TODO KAFKA-14436 Need to ensure KRaft has a higher epoch an ZK
- throw new IllegalStateException(s"Cannot register KRaft controller
$kraftControllerId as the active controller " +
- s"in ZK since its epoch ${controllerEpochToStore} is not higher
than the current ZK epoch ${curEpoch}.")
+ // TODO KAFKA-14436 Increase the KRaft epoch to be higher than the ZK
epoch
+ val newControllerEpoch = if (kraftControllerEpoch >= curEpoch) {
+ kraftControllerEpoch
+ } else {
+ curEpoch + 1
}
- val response = if (controllerOpt.isDefined) {
- info(s"KRaft controller $kraftControllerId overwriting
${ControllerZNode.path} to become the active " +
- s"controller with epoch $controllerEpochToStore. The previous
controller was ${controllerOpt.get}.")
- retryRequestUntilConnected(
- MultiRequest(Seq(
- SetDataOp(ControllerEpochZNode.path,
ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
- DeleteOp(ControllerZNode.path, ZkVersion.MatchAnyVersion),
- CreateOp(ControllerZNode.path,
ControllerZNode.encode(kraftControllerId, timestamp),
- defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
- )
- } else {
- info(s"KRaft controller $kraftControllerId creating
${ControllerZNode.path} to become the active " +
- s"controller with epoch $controllerEpochToStore. There was no
active controller.")
- retryRequestUntilConnected(
- MultiRequest(Seq(
- SetDataOp(ControllerEpochZNode.path,
ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
- CreateOp(ControllerZNode.path,
ControllerZNode.encode(kraftControllerId, timestamp),
- defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
- )
+ val response = controllerOpt match {
+ case Some(controller) =>
+ info(s"KRaft controller $kraftControllerId overwriting
${ControllerZNode.path} to become the active " +
+ s"controller with ZK epoch $newControllerEpoch. The previous
controller was ${controller.broker}.")
+ retryRequestUntilConnected(
+ MultiRequest(Seq(
+ SetDataOp(ControllerEpochZNode.path,
ControllerEpochZNode.encode(newControllerEpoch), curEpochZk),
+ DeleteOp(ControllerZNode.path, controller.zkVersion),
Review Comment:
This is the main difference from the old algorithm. Since we are first
reading `/controller` to learn the previous KRaft epoch, we use that zkVersion
as part of the DeleteOp instead of doing it unconditionally.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]