Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/611#discussion_r9875482
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
 ---
    @@ -18,105 +18,73 @@
     package org.apache.spark.deploy.master
     
     import akka.actor.ActorRef
    -import org.apache.zookeeper._
    -import org.apache.zookeeper.Watcher.Event.EventType
     
     import org.apache.spark.{SparkConf, Logging}
     import org.apache.spark.deploy.master.MasterMessages._
    +import org.apache.curator.framework.CuratorFramework
    +import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, 
LeaderLatch}
     
     private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: 
ActorRef,
         masterUrl: String, conf: SparkConf)
    -  extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging  {
    +  extends LeaderElectionAgent with LeaderLatchListener with Logging  {
     
       val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + 
"/leader_election"
     
    -  private val watcher = new ZooKeeperWatcher()
    -  private val zk = new SparkZooKeeperSession(this, conf)
    +  private var zk: CuratorFramework = _
    +  private var leaderLatch: LeaderLatch = _
       private var status = LeadershipStatus.NOT_LEADER
    -  private var myLeaderFile: String = _
    -  private var leaderUrl: String = _
     
       override def preStart() {
    +
         logInfo("Starting ZooKeeper LeaderElection agent")
    -    zk.connect()
    -  }
    +    zk = SparkCuratorUtil.newClient(conf)
    +    leaderLatch = new LeaderLatch(zk, WORKING_DIR)
    +    leaderLatch.addListener(this)
     
    -  override def zkSessionCreated() {
    -    synchronized {
    -      zk.mkdirRecursive(WORKING_DIR)
    -      myLeaderFile =
    -        zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, 
CreateMode.EPHEMERAL_SEQUENTIAL)
    -      self ! CheckLeader
    -    }
    +    leaderLatch.start()
       }
     
       override def preRestart(reason: scala.Throwable, message: 
scala.Option[scala.Any]) {
    -    logError("LeaderElectionAgent failed, waiting " + zk.ZK_TIMEOUT_MILLIS 
+ "...", reason)
    -    Thread.sleep(zk.ZK_TIMEOUT_MILLIS)
    +    logError("LeaderElectionAgent failed...", reason)
         super.preRestart(reason, message)
       }
     
    -  override def zkDown() {
    -    logError("ZooKeeper down! LeaderElectionAgent shutting down Master.")
    -    System.exit(1)
    -  }
    -
       override def postStop() {
    +    leaderLatch.close()
         zk.close()
       }
     
       override def receive = {
    -    case CheckLeader => checkLeader()
    +    case _ =>
       }
     
    -  private class ZooKeeperWatcher extends Watcher {
    -    def process(event: WatchedEvent) {
    -      if (event.getType == EventType.NodeDeleted) {
    -        logInfo("Leader file disappeared, a master is down!")
    -        self ! CheckLeader
    +  override def isLeader() {
    +    // In case that leadship gain and lost in a short time.
    +    Thread.sleep(1000)
    --- End diff --
    
    Ah, sorry if I was unclear, but I was just joking about putting a 
sleep(1000) in here. The real solution is to add a synchronized block to 
isLeader and notLeader -- I was just making a point that we're not concerned 
with the overhead of synchronization in this code path. (The synchronized block 
is not needed with the current implementation and use of Curator, but I think 
it makes the code clearer without a real downside.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. To do so, please top-post your response.
If your project does not have this feature enabled and wishes so, or if the
feature is enabled but not working, please contact infrastructure at
infrastruct...@apache.org or file a JIRA ticket with INFRA.
---

Reply via email to