Repository: incubator-gearpump Updated Branches: refs/heads/master 504bcf39c -> 2334c19ee
[GEARPUMP-379] Send terminated status to client when an application i⦠â¦s terminated Author: manuzhang <[email protected]> Closes #249 from manuzhang/gearpump_379. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/2334c19e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/2334c19e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/2334c19e Branch: refs/heads/master Commit: 2334c19ee4309e8c3e4814297ef466b0b73758a3 Parents: 504bcf3 Author: manuzhang <[email protected]> Authored: Sat Jun 9 21:42:29 2018 +0800 Committer: manuzhang <[email protected]> Committed: Sat Jun 9 21:42:41 2018 +0800 ---------------------------------------------------------------------- .../apache/gearpump/cluster/ClusterMessage.scala | 6 +++++- .../cluster/client/RunningApplication.scala | 16 +++++++++++----- .../gearpump/cluster/master/AppManager.scala | 17 +++++++++++------ 3 files changed, 27 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2334c19e/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala index 8a067b5..eb7755c 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala @@ -159,11 +159,15 @@ object MasterToClient { /** Return the last error of this streaming application job */ case class LastFailure(time: MilliSeconds, error: String) - sealed trait ApplicationResult + sealed trait ApplicationResult { + def appId: Int + } case class ApplicationSucceeded(appId: Int) extends ApplicationResult case class ApplicationFailed(appId: Int, error: Throwable) extends ApplicationResult + + case class ApplicationTerminated(appId: Int) extends ApplicationResult } object AppMasterToMaster { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2334c19e/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala index 103df01..7c9db9a 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala @@ -56,11 +56,17 @@ class RunningApplication(val appId: Int, master: ActorRef, timeout: Timeout) { def waitUntilFinish(duration: Duration): Unit = { val result = ActorUtil.askActor[ApplicationResult](master, RegisterAppResultListener(appId), new Timeout(duration.getSeconds, TimeUnit.SECONDS)) - result match { - case failed: ApplicationFailed => - throw failed.error - case _ => - LOG.info(s"Application $appId succeeded") + if (result.appId == appId) { + result match { + case failed: ApplicationFailed => + throw failed.error + case _: ApplicationSucceeded => + LOG.info(s"Application $appId succeeded") + case _: ApplicationTerminated => + LOG.info(s"Application $appId terminated") + } + } else { + LOG.warn(s"Received unexpected result $result for application $appId") } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2334c19e/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala index 0a42e3a..0a32d9d 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala @@ -241,17 +241,14 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch case [email protected] => killAppMaster(appId, appRuntimeInfo.worker) updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded) - appResultListeners.getOrElse(appId, List.empty).foreach { client => - client ! ApplicationSucceeded(appId) - } + sendAppResultToListeners(appId, ApplicationSucceeded(appId)) case [email protected] => killAppMaster(appId, appRuntimeInfo.worker) updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed) - appResultListeners.getOrElse(appId, List.empty).foreach { client => - client ! ApplicationFailed(appId, error) - } + sendAppResultToListeners(appId, ApplicationFailed(appId, error)) case [email protected] => updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated) + sendAppResultToListeners(appId, ApplicationTerminated(appId)) case status => LOG.error(s"App $appId should not change it's status to $status") } @@ -271,6 +268,14 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch } } + private def sendAppResultToListeners(appId: Int, result: ApplicationResult): Unit = { + appResultListeners.get(appId).foreach { + _.foreach { client => + client ! result + } + } + } + def appDataStoreService: Receive = { case SaveAppData(appId, key, value) => val client = sender()
