KAFKA-992 followup: Fix zookeeper de-registration bug for controller and consumer; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1db824ed Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1db824ed Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1db824ed Branch: refs/heads/trunk Commit: 1db824ed2fcdaa45c3b1d0dcbf9101299fded09c Parents: 1d75e09 Author: Guozhang Wang <[email protected]> Authored: Fri Aug 9 15:44:00 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Fri Aug 9 15:44:11 2013 -0700 ---------------------------------------------------------------------- .../consumer/ZookeeperConsumerConnector.scala | 30 +++++- .../kafka/controller/KafkaController.scala | 3 +- .../kafka/server/ZookeeperLeaderElector.scala | 98 ++++++++++++++++---- core/src/main/scala/kafka/utils/ZkUtils.scala | 20 +++- .../unit/kafka/server/LeaderElectionTest.scala | 2 +- 5 files changed, 126 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 0ca2850..17977e7 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -213,13 +213,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // this API is used by unit tests only def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry - private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = { + private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) { info("begin registering consumer " + consumerIdString + " in ZK") + val timestamp = SystemTime.milliseconds.toString val consumerRegistrationInfo = Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false) - ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern), valueInQuotes = true)) - createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo) - info("end registering consumer " + consumerIdString + " in ZK") + ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true)) + + while (true) { + try { + createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo) + + info("end registering consumer " + consumerIdString + " in ZK") + return + } catch { + case e: ZkNodeExistsException => { + // An ephemeral node may still exist even after its corresponding session has expired + // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted + // and hence the write succeeds without ZkNodeExistsException + ZkUtils.readDataMaybeNull(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString)._1 match { + case Some(consumerZKString) => { + info("I wrote this conflicted ephemeral node a while back in a different session, " + + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry") + Thread.sleep(config.zkSessionTimeoutMs) + } + case None => // the node disappeared; retry creating the ephemeral node immediately + } + } + } + } } private def sendShutdownToAllQueues() = { http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index c87caab..800f900 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -37,6 +37,7 @@ import scala.Some import kafka.common.TopicAndPartition class ControllerContext(val zkClient: ZkClient, + val zkSessionTimeout: Int, var controllerChannelManager: ControllerChannelManager = null, val controllerLock: Object = new Object, var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty, @@ -83,7 +84,7 @@ object KafkaController { class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true - val controllerContext = new ControllerContext(zkClient) + val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) private val partitionStateMachine = new PartitionStateMachine(this) private val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index 574922b..d785db9 100644 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -17,10 +17,11 @@ package kafka.server import kafka.utils.ZkUtils._ -import kafka.utils.Logging +import kafka.utils.{Json, Utils, SystemTime, Logging} import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.I0Itec.zkclient.IZkDataListener import kafka.controller.ControllerContext +import kafka.common.KafkaException /** * This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle @@ -46,23 +47,63 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: def elect: Boolean = { controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) - try { - createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, brokerId.toString) - info(brokerId + " successfully elected as leader") - leaderId = brokerId - onBecomingLeader() - } catch { - case e: ZkNodeExistsException => - // If someone else has written the path, then - val data: String = controllerContext.zkClient.readData(electionPath, true) - debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId)) - if (data != null) { - leaderId = data.toInt - } - case e2 => - error("Error while electing or becoming leader on broker %d".format(brokerId), e2) - resign() - } + val timestamp = SystemTime.milliseconds.toString + val electString = + Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false) + ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true)) + + var electNotDone = true + do { + electNotDone = false + try { + createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, electString) + + info(brokerId + " successfully elected as leader") + leaderId = brokerId + onBecomingLeader() + } catch { + case e: ZkNodeExistsException => + readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match { + // If someone else has written the path, then read the broker id + case Some(controllerString) => + try { + Json.parseFull(controllerString) match { + case Some(m) => + val controllerInfo = m.asInstanceOf[Map[String, Any]] + leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int] + if (leaderId != brokerId) { + info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) + } else { + info("I wrote this conflicted ephemeral node a while back in a different session, " + + "hence I will retry") + electNotDone = true + Thread.sleep(controllerContext.zkSessionTimeout) + } + case None => + warn("Error while reading leader info %s on broker %d, may be it is an old version".format(controllerString, brokerId)) + throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. May be it is an old version") + } + } catch { + case t => + // It may be due to an incompatible controller register version + info("Failed to parse the controller info as json. " + + "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controllerString)) + try { + leaderId = controllerString.toInt + info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) + } catch { + case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t) + } + } + case None => + // The node disappears, retry immediately + } + case e2 => + error("Error while electing or becoming leader on broker %d".format(brokerId), e2) + resign() + } + } while (electNotDone) + amILeader } @@ -88,8 +129,25 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: @throws(classOf[Exception]) def handleDataChange(dataPath: String, data: Object) { controllerContext.controllerLock synchronized { - leaderId = data.toString.toInt - info("New leader is %d".format(leaderId)) + try { + Json.parseFull(data.toString) match { + case Some(m) => + val controllerInfo = m.asInstanceOf[Map[String, Any]] + leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int] + info("New leader is %d".format(leaderId)) + case None => + error("Error while reading the leader info %s".format(data.toString)) + } + } catch { + case t => + // It may be due to an incompatible controller register version + try { + leaderId = data.toString.toInt + info("New leader is %d".format(leaderId)) + } catch { + case t => throw new KafkaException("Failed to parse the leader info from zookeeper: " + data, t) + } + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 0072a1a..9772af8 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -54,7 +54,25 @@ object ZkUtils extends Logging { def getController(zkClient: ZkClient): Int= { readDataMaybeNull(zkClient, ControllerPath)._1 match { - case Some(controller) => controller.toInt + case Some(controller) => + try { + Json.parseFull(controller) match { + case Some(m) => + val controllerInfo = m.asInstanceOf[Map[String, Any]] + controllerInfo.get("brokerid").get.asInstanceOf[Int] + case None => throw new KafkaException("Failed to parse the controller info json [%s] from zookeeper.".format(controller)) + } + } catch { + case t => + // It may be due to an incompatible controller register version + info("Failed to parse the controller info as json. " + + "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controller)) + try { + controller.toInt + } catch { + case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t) + } + } case None => throw new KafkaException("Controller doesn't exist") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index c4328f0..70e4b51 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -124,7 +124,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val controllerId = 2 val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort())) val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port)) - val controllerContext = new ControllerContext(zkClient) + val controllerContext = new ControllerContext(zkClient, 6000) controllerContext.liveBrokers = brokers.toSet val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig) controllerChannelManager.startup()
