Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 3206770e7 -> 13ae50fb5


fix GEARPUMP-7 Master members are not updated if a master is shutdown

On behalf of Xiang

Author: huafengw <[email protected]>

Closes #8 from huafengw/fix_GEARPUMP-7.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/13ae50fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/13ae50fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/13ae50fb

Branch: refs/heads/master
Commit: 13ae50fb5990f42e5f690b446a038feed2e80fd2
Parents: 3206770
Author: huafengw <[email protected]>
Authored: Fri Apr 29 11:41:29 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Fri Apr 29 11:41:29 2016 +0800

----------------------------------------------------------------------
 .../io/gearpump/cluster/ClusterMessage.scala    |  2 +-
 .../scala/io/gearpump/cluster/main/Master.scala | 21 ++++++++++++---
 .../io/gearpump/cluster/master/Master.scala     | 28 +++++++++++---------
 project/Build.scala                             |  2 +-
 4 files changed, 34 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/13ae50fb/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala 
b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
index 5a42ea3..ed187a1 100644
--- a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
+++ b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
@@ -25,7 +25,7 @@ import com.typesafe.config.Config
 
 import io.gearpump.TimeStamp
 import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus
-import io.gearpump.cluster.master.MasterSummary
+import io.gearpump.cluster.master.{MasterNode, MasterSummary}
 import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, 
ResourceRequest}
 import io.gearpump.cluster.worker.{WorkerId, WorkerSummary}
 import io.gearpump.metrics.Metrics.MetricType

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/13ae50fb/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala 
b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
index 6a4ac07..eac1c54 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
@@ -33,7 +33,8 @@ import com.typesafe.config.ConfigValueFactory
 import org.slf4j.Logger
 
 import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.master.{Master => MasterActor}
+import io.gearpump.cluster.master.{Master => MasterActor, MasterNode}
+import io.gearpump.cluster.master.Master.MasterListUpdated
 import io.gearpump.util.Constants._
 import io.gearpump.util.LogUtil.ProcessType
 import io.gearpump.util.{AkkaApp, Constants, LogUtil}
@@ -176,15 +177,17 @@ class MasterWatcher(role: String) extends Actor with 
ActorLogging {
         context.become(waitForShutdown)
         self ! MasterWatcher.Shutdown
       } else {
-        context.actorOf(Props(classOf[MasterActor]), MASTER)
-        context.become(waitForClusterEvent)
+        val master = context.actorOf(Props(classOf[MasterActor]), MASTER)
+        notifyMasterMembersChange(master)
+        context.become(waitForClusterEvent(master))
       }
     }
   }
 
-  def waitForClusterEvent: Receive = {
+  def waitForClusterEvent(master: ActorRef): Receive = {
     case MemberUp(m) if matchingRole(m) => {
       membersByAge += m
+      notifyMasterMembersChange(master)
     }
     case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] ||
       mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => {
@@ -196,10 +199,20 @@ class MasterWatcher(role: String) extends Actor with 
ActorLogging {
           s"shutting down...${membersByAge.iterator.mkString(",")}")
         context.become(waitForShutdown)
         self ! MasterWatcher.Shutdown
+      } else {
+        notifyMasterMembersChange(master)
       }
     }
   }
 
+  private def notifyMasterMembersChange(master: ActorRef): Unit = {
+    val masters = membersByAge.toList.map{ member =>
+      MasterNode(member.address.host.getOrElse("Unknown-Host"),
+        member.address.port.getOrElse(0))
+    }
+    master ! MasterListUpdated(masters)
+  }
+
   def waitForShutdown: Receive = {
     case MasterWatcher.Shutdown => {
       cluster.unsubscribe(self)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/13ae50fb/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala 
b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
index 0203237..0dfa381 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
@@ -86,6 +86,12 @@ private[cluster] class Master extends Actor with Stash {
 
   private val hostPort = 
HostPort(ActorUtil.getSystemAddress(context.system).hostPort)
 
+  // Maintain the list of active masters.
+  private var masters: List[MasterNode] = {
+    // Add myself into the list of initial masters.
+    List(MasterNode(hostPort.host, hostPort.port))
+  }
+
   val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
 
   val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
@@ -124,6 +130,7 @@ private[cluster] class Master extends Actor with Stash {
 
   def receiveHandler: Receive = workerMsgHandler orElse
     appMasterMsgHandler orElse
+    onMasterListChange orElse
     clientMsgHandler orElse
     metricsService orElse
     jarStoreService orElse
@@ -195,7 +202,7 @@ private[cluster] class Master extends Actor with Stash {
       val masterDescription =
         MasterSummary(
           MasterNode(hostPort.host, hostPort.port),
-          getMasterClusterList.map(hostPort => MasterNode(hostPort.host, 
hostPort.port)),
+          masters,
           aliveFor,
           logFileDir,
           jarStoreRootPath,
@@ -212,21 +219,13 @@ private[cluster] class Master extends Actor with Stash {
       appManager forward invalidAppMaster
   }
 
-  private def getMasterClusterList: List[HostPort] = {
-    val cluster = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS)
-      .asScala.map(HostPort(_)).toList
-
-    if (cluster.isEmpty) {
+  import scala.util.{Failure, Success}
 
-      // Add myself into the list if it is a single node cluster
-      List(hostPort)
-    } else {
-      cluster
-    }
+  def onMasterListChange: Receive = {
+    case MasterListUpdated(masters: List[MasterNode]) =>
+      this.masters = masters
   }
 
-  import scala.util.{Failure, Success}
-
   def clientMsgHandler: Receive = {
     case app: SubmitApplication =>
       LOG.debug(s"Receive from client, SubmitApplication $app")
@@ -303,6 +302,9 @@ object Master {
 
   case class MasterInfo(master: ActorRef, startTime: Long = 0L)
 
+  /** Notify the subscriber that master actor list has been updated */
+  case class MasterListUpdated(masters: List[MasterNode])
+
   object MasterInfo {
     def empty: MasterInfo = MasterInfo(null)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/13ae50fb/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index ad775bd..b9e6f94 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -264,7 +264,7 @@ object Build extends sbt.Build {
   lazy val daemon = Project(
     id = "gearpump-daemon",
     base = file("daemon"),
-    settings = commonSettings ++ noPublish ++ daemonDependencies
+    settings = commonSettings ++ daemonDependencies
   ) dependsOn(core % "test->test; compile->compile", cgroup % "test->test; 
compile->compile")
 
   lazy val cgroup = Project(

Reply via email to