[
https://issues.apache.org/jira/browse/GEARPUMP-265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833374#comment-15833374
]
ASF GitHub Bot commented on GEARPUMP-265:
-----------------------------------------
Github user manuzhang commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/134#discussion_r97216883
--- Diff:
core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala ---
@@ -217,40 +206,56 @@ private[cluster] class AppManager(kvService:
ActorRef, launcher: AppMasterLaunch
LOG.error(failed.reason)
}
- private def getAppMasterStatus(appId: Int): AppMasterStatus = {
- if (activeAppMasters.contains(appId)) {
- AppMasterActive
- } else if (deadAppMasters.contains(appId)) {
- AppMasterInActive
- } else if (appMasterRegistry.contains(appId)) {
- AppMasterPending
- } else {
- AppMasterNonExist
- }
- }
+ def appMasterMessage: Receive = {
+ case RegisterAppMaster(appId, appMaster, workerInfo) =>
+ val appInfo = applicationRegistry.get(appId)
+ appInfo match {
+ case Some(info) =>
+ LOG.info(s"Register AppMaster for app: $appId")
+ val updatedInfo = info.onAppMasterRegister(appMaster,
workerInfo.ref)
+ context.watch(appMaster)
+ applicationRegistry += appId -> updatedInfo
+ kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
MasterState(nextAppId, applicationRegistry))
+ sender ! AppMasterRegistered(appId)
+ case None =>
+ LOG.error(s"Can not find submitted application $appId")
+ }
- private def shutDownExecutorTimeOut(): Unit = {
- LOG.error(s"Shut down executor time out")
- }
+ case ApplicationStatusChanged(appId, newStatus, timeStamp, error) =>
+ applicationRegistry.get(appId) match {
+ case Some(appRuntimeInfo) =>
+ var updatedStatus: ApplicationRuntimeInfo = null
+ LOG.info(s"Application $appId change to ${newStatus.toString} at
$timeStamp")
+ newStatus match {
+ case ApplicationStatus.Active =>
+ updatedStatus = appRuntimeInfo.onActived(timeStamp)
+ sender ! AppMasterActivated(appId)
+ case [email protected] =>
+ killAppMasterExecutor(appId, appRuntimeInfo.worker)
+ updatedStatus = appRuntimeInfo.onTerminalStatus(timeStamp,
finished)
+ appResultListeners.getOrElse(appId, List.empty).foreach{
client =>
+ client ! ApplicationFinished(appId)
+ }
+ case [email protected] =>
+ killAppMasterExecutor(appId, appRuntimeInfo.worker)
+ updatedStatus = appRuntimeInfo.onTerminalStatus(timeStamp,
failed)
+ appResultListeners.getOrElse(appId, List.empty).foreach{
client =>
+ client ! ApplicationFailed(appId, error)
+ }
+ case [email protected] =>
+ updatedStatus = appRuntimeInfo.onTerminalStatus(timeStamp,
terminated)
+ case _ =>
--- End diff --
what is for ? Any comments ?
> remove AppMasterRuntimeInfo from AppMasterContext
> -------------------------------------------------
>
> Key: GEARPUMP-265
> URL: https://issues.apache.org/jira/browse/GEARPUMP-265
> Project: Apache Gearpump
> Issue Type: Sub-task
> Affects Versions: 0.8.2
> Reporter: Huafeng Wang
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)