Github user aarondav commented on a diff in the pull request: https://github.com/apache/incubator-spark/pull/611#discussion_r9875482 --- Diff: core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala --- @@ -18,105 +18,73 @@ package org.apache.spark.deploy.master import akka.actor.ActorRef -import org.apache.zookeeper._ -import org.apache.zookeeper.Watcher.Event.EventType import org.apache.spark.{SparkConf, Logging} import org.apache.spark.deploy.master.MasterMessages._ +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch} private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String, conf: SparkConf) - extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging { + extends LeaderElectionAgent with LeaderLatchListener with Logging { val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" - private val watcher = new ZooKeeperWatcher() - private val zk = new SparkZooKeeperSession(this, conf) + private var zk: CuratorFramework = _ + private var leaderLatch: LeaderLatch = _ private var status = LeadershipStatus.NOT_LEADER - private var myLeaderFile: String = _ - private var leaderUrl: String = _ override def preStart() { + logInfo("Starting ZooKeeper LeaderElection agent") - zk.connect() - } + zk = SparkCuratorUtil.newClient(conf) + leaderLatch = new LeaderLatch(zk, WORKING_DIR) + leaderLatch.addListener(this) - override def zkSessionCreated() { - synchronized { - zk.mkdirRecursive(WORKING_DIR) - myLeaderFile = - zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL) - self ! CheckLeader - } + leaderLatch.start() } override def preRestart(reason: scala.Throwable, message: scala.Option[scala.Any]) { - logError("LeaderElectionAgent failed, waiting " + zk.ZK_TIMEOUT_MILLIS + "...", reason) - Thread.sleep(zk.ZK_TIMEOUT_MILLIS) + logError("LeaderElectionAgent failed...", reason) super.preRestart(reason, message) } - override def zkDown() { - logError("ZooKeeper down! LeaderElectionAgent shutting down Master.") - System.exit(1) - } - override def postStop() { + leaderLatch.close() zk.close() } override def receive = { - case CheckLeader => checkLeader() + case _ => } - private class ZooKeeperWatcher extends Watcher { - def process(event: WatchedEvent) { - if (event.getType == EventType.NodeDeleted) { - logInfo("Leader file disappeared, a master is down!") - self ! CheckLeader + override def isLeader() { + // In case that leadship gain and lost in a short time. + Thread.sleep(1000) --- End diff -- Ah, sorry if I was unclear, but I was just joking about putting a sleep(1000) in here. The real solution is to add a synchronized block to isLeader and notLeader -- I was just making a point that we're not concerned with the overhead of synchronization in this code path. (The synchronized block is not needed with the current implementation and use of Curator, but I think it makes the code clearer without a real downside.)
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. To do so, please top-post your response. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---