Revert "KAFKA-992 follow up: Fix the zookeeper de-registration issue for controller and consumer; reviewed by Neha Narkhede"
This reverts commit 81c49bbdae5e490f9d5dc7b042ee60e617fbb22b. Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d75e093 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d75e093 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d75e093 Branch: refs/heads/trunk Commit: 1d75e09313b5b3f1cde6c39fdda20bc7185cfdf6 Parents: 81c49bb Author: Neha Narkhede <[email protected]> Authored: Fri Aug 9 15:29:08 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Fri Aug 9 15:29:08 2013 -0700 ---------------------------------------------------------------------- .../consumer/ZookeeperConsumerConnector.scala | 30 +----- .../kafka/controller/KafkaController.scala | 3 +- .../kafka/server/ZookeeperLeaderElector.scala | 97 ++++---------------- core/src/main/scala/kafka/utils/ZkUtils.scala | 22 +---- .../unit/kafka/server/LeaderElectionTest.scala | 2 +- 5 files changed, 28 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 17977e7..0ca2850 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -213,35 +213,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // this API is used by unit tests only def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry - private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) { + private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = { info("begin registering consumer " + consumerIdString + " in ZK") - val timestamp = SystemTime.milliseconds.toString val consumerRegistrationInfo = Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false) - ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true)) - - while (true) { - try { - createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo) - - info("end registering consumer " + consumerIdString + " in ZK") - return - } catch { - case e: ZkNodeExistsException => { - // An ephemeral node may still exist even after its corresponding session has expired - // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted - // and hence the write succeeds without ZkNodeExistsException - ZkUtils.readDataMaybeNull(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString)._1 match { - case Some(consumerZKString) => { - info("I wrote this conflicted ephemeral node a while back in a different session, " - + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry") - Thread.sleep(config.zkSessionTimeoutMs) - } - case None => // the node disappeared; retry creating the ephemeral node immediately - } - } - } - } + ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern), valueInQuotes = true)) + createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo) + info("end registering consumer " + consumerIdString + " in ZK") } private def sendShutdownToAllQueues() = { http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 800f900..c87caab 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -37,7 +37,6 @@ import scala.Some import kafka.common.TopicAndPartition class ControllerContext(val zkClient: ZkClient, - val zkSessionTimeout: Int, var controllerChannelManager: ControllerChannelManager = null, val controllerLock: Object = new Object, var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty, @@ -84,7 +83,7 @@ object KafkaController { class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true - val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) + val controllerContext = new ControllerContext(zkClient) private val partitionStateMachine = new PartitionStateMachine(this) private val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index 6016bd5..574922b 100644 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -17,11 +17,10 @@ package kafka.server import kafka.utils.ZkUtils._ -import kafka.utils.{Json, Utils, SystemTime, Logging} +import kafka.utils.Logging import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.I0Itec.zkclient.IZkDataListener import kafka.controller.ControllerContext -import kafka.common.KafkaException /** * This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle @@ -47,62 +46,23 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: def elect: Boolean = { controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) - val timestamp = SystemTime.milliseconds.toString - val electString = - Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false) - ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true)) - - var electNotDone = true - do { - electNotDone = false - try { - createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, electString) - - info(brokerId + " successfully elected as leader") - leaderId = brokerId - onBecomingLeader() - } catch { - case e: ZkNodeExistsException => - readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match { - // If someone else has written the path, then read the broker id - case Some(controllerString) => - try { - Json.parseFull(controllerString) match { - case Some(m) => - val controllerInfo = m.asInstanceOf[Map[String, Any]] - leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int] - if (leaderId != brokerId) { - info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) - } else { - info("I wrote this conflicted ephemeral node a while back in a different session, " - + "hence I will retry") - electNotDone = true - Thread.sleep(controllerContext.zkSessionTimeout) - } - case None => - error("Error while reading leader info %s on broker %d".format(controllerString, brokerId)) - resign() - } - } catch { - case t => - // It may be due to an incompatible controller register version - info("Failed to parse the controller info as json. " + - "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controllerString)) - try { - leaderId = controllerString.toInt - info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) - } catch { - case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t) - } - } - } - // If the node disappear, retry immediately - case e2 => - error("Error while electing or becoming leader on broker %d".format(brokerId), e2) - resign() - } - } while (electNotDone) - + try { + createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, brokerId.toString) + info(brokerId + " successfully elected as leader") + leaderId = brokerId + onBecomingLeader() + } catch { + case e: ZkNodeExistsException => + // If someone else has written the path, then + val data: String = controllerContext.zkClient.readData(electionPath, true) + debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId)) + if (data != null) { + leaderId = data.toInt + } + case e2 => + error("Error while electing or becoming leader on broker %d".format(brokerId), e2) + resign() + } amILeader } @@ -128,25 +88,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: @throws(classOf[Exception]) def handleDataChange(dataPath: String, data: Object) { controllerContext.controllerLock synchronized { - try { - Json.parseFull(data.toString) match { - case Some(m) => - val controllerInfo = m.asInstanceOf[Map[String, Any]] - leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int] - info("New leader is %d".format(leaderId)) - case None => - error("Error while reading the leader info %s".format(data.toString)) - } - } catch { - case t => - // It may be due to an incompatible controller register version - try { - leaderId = data.toString.toInt - info("New leader is %d".format(leaderId)) - } catch { - case t => throw new KafkaException("Failed to parse the leader info from zookeeper: " + data, t) - } - } + leaderId = data.toString.toInt + info("New leader is %d".format(leaderId)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 8440d94..0072a1a 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -54,25 +54,7 @@ object ZkUtils extends Logging { def getController(zkClient: ZkClient): Int= { readDataMaybeNull(zkClient, ControllerPath)._1 match { - case Some(controller) => - try { - Json.parseFull(controller) match { - case Some(m) => - val controllerInfo = m.asInstanceOf[Map[String, Any]] - controllerInfo.get("brokerid").get.asInstanceOf[Int] - case None => throw new KafkaException("Failed to parse the controller info json [%s] from zookeeper.".format(controller)) - } - } catch { - case t => - // It may be due to an incompatible controller register version - info("Failed to parse the controller info as json. " + - "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controller)) - try { - controller.toInt - } catch { - case t => throw new KafkaException("Failed to parse the controller info [%s] from zookeeper. This is neither the new or the old format.", t) - } - } + case Some(controller) => controller.toInt case None => throw new KafkaException("Controller doesn't exist") } } @@ -222,7 +204,7 @@ object ZkUtils extends Logging { case Some(brokerZKString) => { val broker = Broker.createBroker(id, brokerZKString) if (broker.host == host && broker.port == port) { - info("I wrote this conflicted ephemeral node a while back in a different session, " + info("I wrote this conflicted ephemeral node [%s] a while back in a different session, ".format(brokerZKString) + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry") Thread.sleep(timeout) } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 70e4b51..c4328f0 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -124,7 +124,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val controllerId = 2 val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort())) val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port)) - val controllerContext = new ControllerContext(zkClient, 6000) + val controllerContext = new ControllerContext(zkClient) controllerContext.liveBrokers = brokers.toSet val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig) controllerChannelManager.startup()
