Repository: incubator-gearpump Updated Branches: refs/heads/master 3206770e7 -> 13ae50fb5
fix GEARPUMP-7 Master members are not updated if a master is shutdown On behalf of Xiang Author: huafengw <[email protected]> Closes #8 from huafengw/fix_GEARPUMP-7. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/13ae50fb Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/13ae50fb Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/13ae50fb Branch: refs/heads/master Commit: 13ae50fb5990f42e5f690b446a038feed2e80fd2 Parents: 3206770 Author: huafengw <[email protected]> Authored: Fri Apr 29 11:41:29 2016 +0800 Committer: manuzhang <[email protected]> Committed: Fri Apr 29 11:41:29 2016 +0800 ---------------------------------------------------------------------- .../io/gearpump/cluster/ClusterMessage.scala | 2 +- .../scala/io/gearpump/cluster/main/Master.scala | 21 ++++++++++++--- .../io/gearpump/cluster/master/Master.scala | 28 +++++++++++--------- project/Build.scala | 2 +- 4 files changed, 34 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/13ae50fb/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala index 5a42ea3..ed187a1 100644 --- a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala +++ b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala @@ -25,7 +25,7 @@ import com.typesafe.config.Config import io.gearpump.TimeStamp import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus -import io.gearpump.cluster.master.MasterSummary +import io.gearpump.cluster.master.{MasterNode, MasterSummary} import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} import io.gearpump.cluster.worker.{WorkerId, WorkerSummary} import io.gearpump.metrics.Metrics.MetricType http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/13ae50fb/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala index 6a4ac07..eac1c54 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala @@ -33,7 +33,8 @@ import com.typesafe.config.ConfigValueFactory import org.slf4j.Logger import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.master.{Master => MasterActor} +import io.gearpump.cluster.master.{Master => MasterActor, MasterNode} +import io.gearpump.cluster.master.Master.MasterListUpdated import io.gearpump.util.Constants._ import io.gearpump.util.LogUtil.ProcessType import io.gearpump.util.{AkkaApp, Constants, LogUtil} @@ -176,15 +177,17 @@ class MasterWatcher(role: String) extends Actor with ActorLogging { context.become(waitForShutdown) self ! MasterWatcher.Shutdown } else { - context.actorOf(Props(classOf[MasterActor]), MASTER) - context.become(waitForClusterEvent) + val master = context.actorOf(Props(classOf[MasterActor]), MASTER) + notifyMasterMembersChange(master) + context.become(waitForClusterEvent(master)) } } } - def waitForClusterEvent: Receive = { + def waitForClusterEvent(master: ActorRef): Receive = { case MemberUp(m) if matchingRole(m) => { membersByAge += m + notifyMasterMembersChange(master) } case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => { @@ -196,10 +199,20 @@ class MasterWatcher(role: String) extends Actor with ActorLogging { s"shutting down...${membersByAge.iterator.mkString(",")}") context.become(waitForShutdown) self ! MasterWatcher.Shutdown + } else { + notifyMasterMembersChange(master) } } } + private def notifyMasterMembersChange(master: ActorRef): Unit = { + val masters = membersByAge.toList.map{ member => + MasterNode(member.address.host.getOrElse("Unknown-Host"), + member.address.port.getOrElse(0)) + } + master ! MasterListUpdated(masters) + } + def waitForShutdown: Receive = { case MasterWatcher.Shutdown => { cluster.unsubscribe(self) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/13ae50fb/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala index 0203237..0dfa381 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala @@ -86,6 +86,12 @@ private[cluster] class Master extends Actor with Stash { private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort) + // Maintain the list of active masters. + private var masters: List[MasterNode] = { + // Add myself into the list of initial masters. + List(MasterNode(hostPort.host, hostPort.port)) + } + val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig) @@ -124,6 +130,7 @@ private[cluster] class Master extends Actor with Stash { def receiveHandler: Receive = workerMsgHandler orElse appMasterMsgHandler orElse + onMasterListChange orElse clientMsgHandler orElse metricsService orElse jarStoreService orElse @@ -195,7 +202,7 @@ private[cluster] class Master extends Actor with Stash { val masterDescription = MasterSummary( MasterNode(hostPort.host, hostPort.port), - getMasterClusterList.map(hostPort => MasterNode(hostPort.host, hostPort.port)), + masters, aliveFor, logFileDir, jarStoreRootPath, @@ -212,21 +219,13 @@ private[cluster] class Master extends Actor with Stash { appManager forward invalidAppMaster } - private def getMasterClusterList: List[HostPort] = { - val cluster = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS) - .asScala.map(HostPort(_)).toList - - if (cluster.isEmpty) { + import scala.util.{Failure, Success} - // Add myself into the list if it is a single node cluster - List(hostPort) - } else { - cluster - } + def onMasterListChange: Receive = { + case MasterListUpdated(masters: List[MasterNode]) => + this.masters = masters } - import scala.util.{Failure, Success} - def clientMsgHandler: Receive = { case app: SubmitApplication => LOG.debug(s"Receive from client, SubmitApplication $app") @@ -303,6 +302,9 @@ object Master { case class MasterInfo(master: ActorRef, startTime: Long = 0L) + /** Notify the subscriber that master actor list has been updated */ + case class MasterListUpdated(masters: List[MasterNode]) + object MasterInfo { def empty: MasterInfo = MasterInfo(null) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/13ae50fb/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index ad775bd..b9e6f94 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -264,7 +264,7 @@ object Build extends sbt.Build { lazy val daemon = Project( id = "gearpump-daemon", base = file("daemon"), - settings = commonSettings ++ noPublish ++ daemonDependencies + settings = commonSettings ++ daemonDependencies ) dependsOn(core % "test->test; compile->compile", cgroup % "test->test; compile->compile") lazy val cgroup = Project(
