Repository: incubator-gearpump
Updated Branches:
  refs/heads/master e78d6cca5 -> ec04cf766


fix GEARPUMP-83, show application pending on worker down

Changes include:

1. add `activeAppMasters` set to `masterState` and `deadAppMasters` is changed 
to set of appIds. Now all existent applications will be in `appMasterRegistry` 
and in **pending** status by default. Those in `activeAppMasters` will have 
**active** status and `deadAppMasters` **inactive** status.
2. add a status column to the "apps" page on the dashboard. status will be one 
of pending, active and inactive.
3. Internally, `TaskManager` will send an `ApplicationReady` message to 
`AppMaster` on ApplicationReady state(with DAG version larger than -1). In 
turn, `AppMaster` will send an `ActivateApplication(appId)` message to 
`Master`(forwarded to `AppManager`). Then the `appId` is added to 
`activeAppMasters`. It is removed from `activeAppMasters` either when an 
application is killed or recovered.
4. add UTs and fix a bunch of warnings (typos, suspicious shadowing, etc)
5. fix integration test
6. allow to kill pending applications

Author: manuzhang <[email protected]>

Closes #31 from manuzhang/appmaster_status.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ec04cf76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ec04cf76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ec04cf76

Branch: refs/heads/master
Commit: ec04cf7666020ba15764611ac872344549f5200c
Parents: e78d6cc
Author: manuzhang <[email protected]>
Authored: Fri Jun 3 09:22:05 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Fri Jun 3 09:22:05 2016 +0800

----------------------------------------------------------------------
 .../gearpump/cluster/ClusterMessage.scala       |  10 ++
 .../appmaster/MasterConnectionKeeper.scala      |   3 +-
 .../gearpump/cluster/master/AppManager.scala    | 152 ++++++++++---------
 .../apache/gearpump/cluster/master/Master.scala |   3 +-
 .../cluster/master/AppManagerSpec.scala         |  11 +-
 .../gearpump/integrationtest/TestSpecBase.scala |  14 +-
 .../checklist/RestServiceSpec.scala             |   6 +-
 .../minicluster/RestClient.scala                |   7 +-
 services/dashboard/services/models/models.js    |   1 +
 services/dashboard/views/apps/apps.js           |   6 +-
 .../streaming/appmaster/AppMaster.scala         |  39 +++--
 .../streaming/appmaster/TaskManager.scala       |   8 +-
 .../streaming/appmaster/TaskManagerSpec.scala   |  19 ++-
 13 files changed, 168 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
index e6ab13b..8aa84b5 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
@@ -162,6 +162,12 @@ trait AppMasterRegisterData
 object AppMasterToMaster {
 
   /**
+   * Activate the AppMaster when an application is ready to run.
+   * @param appId application id
+   */
+  case class ActivateAppMaster(appId: Int)
+  
+  /**
    * Register an AppMaster by providing a ActorRef, and registerData
    * @param registerData The registerData is provided by Master when starting 
the app master.
    *                     App master should return the registerData back to 
master.
@@ -246,10 +252,14 @@ object MasterToAppMaster {
   /** Master confirm reception of RegisterAppMaster message */
   case class AppMasterRegistered(appId: Int)
 
+  /** Master confirm reception of ActivateAppMaster message */
+  case class AppMasterActivated(appId: Int)
+
   /** Shutdown the application job */
   case object ShutdownAppMaster
 
   type AppMasterStatus = String
+  val AppMasterPending: AppMasterStatus = "pending"
   val AppMasterActive: AppMasterStatus = "active"
   val AppMasterInActive: AppMasterStatus = "inactive"
   val AppMasterNonExist: AppMasterStatus = "nonexist"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
index fd2d11a..e3fbe49 100644
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
@@ -46,7 +46,6 @@ class MasterConnectionKeeper(
   import context.dispatcher
 
   private val LOG = LogUtil.getLogger(getClass)
-  private var master: ActorRef = null
 
   // Subscribe self to masterProxy,
   masterProxy ! WatchMaster(self)
@@ -72,7 +71,7 @@ class MasterConnectionKeeper(
 
   def masterLivenessListener: Receive = {
     case MasterRestarted =>
-      LOG.info("Master restarted, re-registering appmaster....")
+      LOG.info("Master restarted, re-registering AppMaster....")
       context.become(waitMasterToConfirm(registerAppMaster))
     case MasterStopped =>
       LOG.info("Master is dead, killing this AppMaster....")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala 
b/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
index 435b83e..9a3a119 100644
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -47,7 +47,7 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
-  private val executorId: Int = APPMASTER_DEFAULT_EXECUTOR_ID
+  private val EXECUTOR_ID: Int = APPMASTER_DEFAULT_EXECUTOR_ID
   private val appMasterMaxRetries: Int = 5
   private val appMasterRetryTimeRange: Duration = 20.seconds
 
@@ -55,13 +55,17 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
   implicit val executionContext = context.dispatcher
 
   // Next available appId
-  private var appId: Int = 1
+  private var nextAppId: Int = 1
 
-  // From appid to appMaster data
+  // From appId to appMaster data
+  // Applications not in activeAppMasters or deadAppMasters are in pending 
status
   private var appMasterRegistry = Map.empty[Int, (ActorRef, 
AppMasterRuntimeInfo)]
 
-  // Dead appmaster list
-  private var deadAppMasters = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
+  // Active appMaster list where applications are in active status
+  private var activeAppMasters = Set.empty[Int]
+
+  // Dead appMaster list where applications are in inactive status
+  private var deadAppMasters = Set.empty[Int]
 
   private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
 
@@ -74,7 +78,8 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
     case GetKVSuccess(_, result) =>
       val masterState = result.asInstanceOf[MasterState]
       if (masterState != null) {
-        this.appId = masterState.maxId + 1
+        this.nextAppId = masterState.maxId + 1
+        this.activeAppMasters = masterState.activeAppMasters
         this.deadAppMasters = masterState.deadAppMasters
         this.appMasterRegistry = masterState.appMasterRegistry
       }
@@ -97,20 +102,20 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
 
   def clientMsgHandler: Receive = {
     case SubmitApplication(app, jar, username) =>
-      LOG.info(s"Submit Application ${app.name}($appId) by $username...")
-      val client = sender
+      LOG.info(s"Submit Application ${app.name}($nextAppId) by $username...")
+      val client = sender()
       if (applicationNameExist(app.name)) {
         client ! SubmitApplicationResult(Failure(
           new Exception(s"Application name ${app.name} already existed")))
       } else {
-        context.actorOf(launcher.props(appId, executorId, app, jar, username, 
context.parent,
-          Some(client)), s"launcher${appId}_${Util.randInt()}")
+        context.actorOf(launcher.props(nextAppId, EXECUTOR_ID, app, jar, 
username, context.parent,
+          Some(client)), s"launcher${nextAppId}_${Util.randInt()}")
 
-        val appState = new ApplicationState(appId, app.name, 0, app, jar, 
username, null)
-        appMasterRestartPolicies += appId ->
+        val appState = new ApplicationState(nextAppId, app.name, 0, app, jar, 
username, null)
+        appMasterRestartPolicies += nextAppId ->
           new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
-        kvService ! PutKV(appId.toString, APP_STATE, appState)
-        appId += 1
+        kvService ! PutKV(nextAppId.toString, APP_STATE, appState)
+        nextAppId += 1
       }
 
     case RestartApplication(appId) =>
@@ -136,22 +141,25 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
 
     case ShutdownApplication(appId) =>
       LOG.info(s"App Manager Shutting down application $appId")
-      val (_, info) = appMasterRegistry.getOrElse(appId, (null, null))
-      Option(info) match {
+      val (_, appInfo) = appMasterRegistry.get(appId)
+        .filter { case (_, info) => !deadAppMasters.contains(info.appId)}
+        .getOrElse((null, null))
+      Option(appInfo) match {
         case Some(info) =>
           val worker = info.worker
           val workerPath = Option(worker).map(_.path).orNull
-          LOG.info(s"Shutdown AppMaster at ${workerPath}, appId: $appId, 
executorId: $executorId")
+          LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, 
executorId: $EXECUTOR_ID")
           cleanApplicationData(appId)
-          val shutdown = ShutdownExecutor(appId, executorId,
+          val shutdown = ShutdownExecutor(appId, EXECUTOR_ID,
             s"AppMaster $appId shutdown requested by master...")
           sendMsgWithTimeOutCallBack(worker, shutdown, 30000, 
shutDownExecutorTimeOut())
           sender ! ShutdownApplicationResult(Success(appId))
         case None =>
-          val errorMsg = s"Failed to find regisration information for appId: 
$appId"
+          val errorMsg = s"Failed to find registration information for appId: 
$appId"
           LOG.error(errorMsg)
           sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg)))
       }
+
     case ResolveAppId(appId) =>
       val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null))
       if (null != appMaster) {
@@ -159,6 +167,7 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
       } else {
         sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find 
Application: $appId")))
       }
+
     case AppMastersDataRequest =>
       var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
       appMasterRegistry.foreach(pair => {
@@ -166,31 +175,19 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
         val appMasterPath = ActorUtil.getFullPath(context.system, 
appMaster.path)
         val workerPath = Option(info.worker).map(worker =>
           ActorUtil.getFullPath(context.system, worker.path))
+        val status = getAppMasterStatus(id)
         appMastersData += AppMasterData(
-          AppMasterActive, id, info.appName, appMasterPath, workerPath.orNull,
-          info.submissionTime, info.startTime, info.finishTime, info.user)
-      })
-
-      deadAppMasters.foreach(pair => {
-        val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
-        val appMasterPath = ActorUtil.getFullPath(context.system, 
appMaster.path)
-        val workerPath = Option(info.worker).map(worker =>
-          ActorUtil.getFullPath(context.system, worker.path))
-
-        appMastersData += AppMasterData(
-          AppMasterInActive, id, info.appName, appMasterPath, 
workerPath.orNull,
+          status, id, info.appName, appMasterPath, workerPath.orNull,
           info.submissionTime, info.startTime, info.finishTime, info.user)
       })
 
       sender ! AppMastersData(appMastersData.toList)
+
     case QueryAppMasterConfig(appId) =>
       val config =
         if (appMasterRegistry.contains(appId)) {
           val (_, info) = appMasterRegistry(appId)
           info.config
-        } else if (deadAppMasters.contains(appId)) {
-          val (_, info) = deadAppMasters(appId)
-          info.config
         } else {
           null
         }
@@ -198,28 +195,19 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
 
     case appMasterDataRequest: AppMasterDataRequest =>
       val appId = appMasterDataRequest.appId
-      val (appStatus, appMaster, info) =
-        if (appMasterRegistry.contains(appId)) {
-          val (appMaster, info) = appMasterRegistry(appId)
-          (AppMasterActive, appMaster, info)
-        } else if (deadAppMasters.contains(appId)) {
-          val (appMaster, info) = deadAppMasters(appId)
-          (AppMasterInActive, appMaster, info)
-        } else {
-          (AppMasterNonExist, null, null)
-        }
+      val appStatus = getAppMasterStatus(appId)
 
       appStatus match {
-        case AppMasterActive | AppMasterInActive =>
+        case AppMasterNonExist =>
+          sender ! AppMasterData(AppMasterNonExist)
+        case _ =>
+          val (appMaster, info) = appMasterRegistry(appId)
           val appMasterPath = ActorUtil.getFullPath(context.system, 
appMaster.path)
           val workerPath = Option(info.worker).map(
             worker => ActorUtil.getFullPath(context.system, 
worker.path)).orNull
           sender ! AppMasterData(
             appStatus, appId, info.appName, appMasterPath, workerPath,
             info.submissionTime, info.startTime, info.finishTime, info.user)
-
-        case AppMasterNonExist =>
-          sender ! AppMasterData(AppMasterNonExist)
       }
   }
 
@@ -230,6 +218,18 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
       LOG.error(failed.reason)
   }
 
+  private def getAppMasterStatus(appId: Int): AppMasterStatus = {
+    if (activeAppMasters.contains(appId)) {
+      AppMasterActive
+    } else if (deadAppMasters.contains(appId)) {
+      AppMasterInActive
+    } else if (appMasterRegistry.contains(appId)) {
+      AppMasterPending
+    } else {
+      AppMasterNonExist
+    }
+  }
+
   private def shutDownExecutorTimeOut(): Unit = {
     LOG.error(s"Shut down executor time out")
   }
@@ -239,17 +239,24 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
       val startTime = System.currentTimeMillis()
       val register = registerBack.copy(startTime = startTime)
 
-      LOG.info(s"Register AppMaster for app: ${register.appId} $register")
+      LOG.info(s"Register AppMaster for app: ${register.appId}, $register")
       context.watch(appMaster)
       appMasterRegistry += register.appId -> (appMaster, register)
       kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
-        MasterState(appId, appMasterRegistry, deadAppMasters))
+        MasterState(nextAppId, appMasterRegistry, activeAppMasters, 
deadAppMasters))
       sender ! AppMasterRegistered(register.appId)
+
+    case ActivateAppMaster(appId) =>
+      LOG.info(s"Activate AppMaster for app $appId")
+      activeAppMasters += appId
+      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+        MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, 
deadAppMasters))
+      sender ! AppMasterActivated(appId)
   }
 
   def appDataStoreService: Receive = {
     case SaveAppData(appId, key, value) =>
-      val client = sender
+      val client = sender()
       (kvService ? PutKV(appId.toString, key, 
value)).asInstanceOf[Future[PutKVResult]].map {
         case PutKVSuccess =>
           client ! AppDataSaved
@@ -257,7 +264,7 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
           client ! SaveAppDataFailed
       }
     case GetAppData(appId, key) =>
-      val client = sender
+      val client = sender()
       (kvService ? GetKV(appId.toString, 
key)).asInstanceOf[Future[GetKVResult]].map {
         case GetKVSuccess(privateKey, value) =>
           client ! GetAppDataResult(key, value)
@@ -268,9 +275,8 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
 
   def terminationWatch: Receive = {
     case terminate: Terminated =>
-      terminate.getAddressTerminated()
-      LOG.info(s"AppMaster(${terminate.actor.path}) is terminiated, " +
-        s"network down: ${terminate.getAddressTerminated()}")
+      LOG.info(s"AppMaster(${terminate.actor.path}) is terminated, " +
+        s"network down: ${terminate.getAddressTerminated}")
 
       // Now we assume that the only normal way to stop the application is 
submitting a
       // ShutdownApplication request
@@ -300,7 +306,10 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
       val appId = state.appId
       if (appMasterRestartPolicies.get(appId).get.allowRestart) {
         LOG.info(s"AppManager Recovering Application $appId...")
-        context.actorOf(launcher.props(appId, executorId, state.app, 
state.jar, state.username,
+        activeAppMasters -= appId
+        kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+          MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, 
deadAppMasters))
+        context.actorOf(launcher.props(appId, EXECUTOR_ID, state.app, 
state.jar, state.username,
           context.parent, None), s"launcher${appId}_${Util.randInt()}")
       } else {
         LOG.error(s"Application $appId failed too many times")
@@ -310,22 +319,26 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
   case class RecoverApplication(applicationStatus: ApplicationState)
 
   private def cleanApplicationData(appId: Int): Unit = {
-    // Add the dead app to dead appMaster
-    appMasterRegistry.get(appId).foreach { pair =>
-      val (appMasterActor, info) = pair
-      deadAppMasters += appId -> (appMasterActor, info.copy(
-        finishTime = System.currentTimeMillis()))
+    if (appMasterRegistry.contains(appId)) {
+      // Add the dead app to dead appMasters
+      deadAppMasters += appId
+      // Remove the dead app from active appMasters
+      activeAppMasters -= appId
+
+      appMasterRegistry += appId -> {
+        val (ref, info) = appMasterRegistry(appId)
+        (ref, info.copy(finishTime = System.currentTimeMillis()))
+      }
+      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+        MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, 
deadAppMasters))
+      kvService ! DeleteKVGroup(appId.toString)
     }
-
-    appMasterRegistry -= appId
-
-    kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
-      MasterState(this.appId, appMasterRegistry, deadAppMasters))
-    kvService ! DeleteKVGroup(appId.toString)
   }
 
   private def applicationNameExist(appName: String): Boolean = {
-    appMasterRegistry.values.exists(_._2.appName == appName)
+    appMasterRegistry.values.exists { case (_, info) =>
+      info.appName == appName && !deadAppMasters.contains(info.appId)
+    }
   }
 }
 
@@ -337,5 +350,6 @@ object AppManager {
   case class MasterState(
       maxId: Int,
       appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)],
-      deadAppMasters: Map[Int, (ActorRef, AppMasterRuntimeInfo)])
+      activeAppMasters: Set[Int],
+      deadAppMasters: Set[Int])
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala 
b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
index 1536a23..762cf27 100644
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
@@ -187,8 +187,9 @@ private[cluster] class Master extends Actor with Stash {
     case request: RequestResource =>
       scheduler forward request
     case registerAppMaster: RegisterAppMaster =>
-      // Forward to appManager
       appManager forward registerAppMaster
+    case activateAppMaster: ActivateAppMaster =>
+      appManager forward activateAppMaster
     case save: SaveAppData =>
       appManager forward save
     case get: GetAppData =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala 
b/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
index ae0ebcd..58e3593 100644
--- 
a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
+++ 
b/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
@@ -52,20 +52,23 @@ class AppManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach with
     appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
       new DummyAppMasterLauncherFactory(appLauncher))))
     kvService.expectMsgType[GetKV]
-    kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, 
Map.empty)))
+    kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, 
Set.empty, Set.empty)))
   }
 
   override def afterEach(): Unit = {
     shutdownActorSystem()
   }
 
-  "AppManager" should "handle appmaster message correctly" in {
+  "AppManager" should "handle AppMaster message correctly" in {
     val appMaster = TestProbe()(getActorSystem)
-    val worker = TestProbe()(getActorSystem)
+    val appId = 1
 
-    val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(0, 
"appName"))
+    val register = RegisterAppMaster(appMaster.ref, 
AppMasterRuntimeInfo(appId, "appName"))
     appMaster.send(appManager, register)
     appMaster.expectMsgType[AppMasterRegistered]
+
+    appMaster.send(appManager, ActivateAppMaster(appId))
+    appMaster.expectMsgType[AppMasterActivated]
   }
 
   "DataStoreService" should "support Put and Get" in {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
index dabcc71..f480475 100644
--- 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
@@ -63,7 +63,7 @@ trait TestSpecBase
 
     assert(cluster != null, "Configure MiniCluster properly in suite spec")
     cluster.isAlive shouldBe true
-    restClient.listRunningApps().isEmpty shouldBe true
+    restClient.listPendingOrRunningApps().isEmpty shouldBe true
     LOGGER.debug(s">### 
=============================================================")
     LOGGER.debug(s">###2 Start test: ${td.name}\n")
   }
@@ -77,16 +77,16 @@ trait TestSpecBase
       LOGGER.info("Will restart the cluster for next test case")
       cluster.restart()
     } else {
-      restClient.listRunningApps().foreach(app => {
+      restClient.listPendingOrRunningApps().foreach(app => {
         commandLineClient.killApp(app.appId) shouldBe true
       })
     }
   }
 
-  def expectAppIsRunning(appId: Int, expectedAppName: String): AppMasterData = 
{
-    val app = restClient.queryApp(appId)
-    app.status shouldEqual MasterToAppMaster.AppMasterActive
-    app.appName shouldEqual expectedAppName
-    app
+  def expectAppIsRunning(appId: Int, expectedAppName: String): Unit = {
+    Util.retryUntil(() => {
+      val app = restClient.queryApp(appId)
+      app.status == MasterToAppMaster.AppMasterActive && app.appName == 
expectedAppName
+    }, s"$expectedAppName is running")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
index 2f5bb64..992533b 100644
--- 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
@@ -162,7 +162,7 @@ class RestServiceSpec extends TestSpecBase {
       killAppAndVerify(appId)
     }
 
-    "should fail when attempting to kill a stopped application" in {
+    "fail when attempting to kill a stopped application" in {
       // setup
       val appId = restClient.getNextAvailableAppId()
       val submissionSucess = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
@@ -175,7 +175,7 @@ class RestServiceSpec extends TestSpecBase {
       success shouldBe false
     }
 
-    "should fail when attempting to kill a non-exist application" in {
+    "fail when attempting to kill a non-exist application" in {
       // setup
       val freeAppId = restClient.listApps().length + 1
 
@@ -342,6 +342,8 @@ class RestServiceSpec extends TestSpecBase {
       val killedApp = restClient.queryApp(originAppId)
       killedApp.appId shouldEqual originAppId
       killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive
+      val newAppId = originAppId + 1
+      expectAppIsRunning(newAppId, wordCountName)
       val runningApps = restClient.listRunningApps()
       runningApps.length shouldEqual 1
       val newAppDetail = 
restClient.queryStreamingAppDetail(runningApps.head.appId)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
 
b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
index 1b143af..8fa0679 100644
--- 
a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
+++ 
b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
@@ -86,6 +86,11 @@ class RestClient(host: String, port: Int) {
     decodeAs[AppMastersData](resp).appMasters.toArray
   }
 
+  def listPendingOrRunningApps(): Array[AppMasterData] = {
+    listApps().filter(app => app.status == MasterToAppMaster.AppMasterActive
+      || app.status == MasterToAppMaster.AppMasterPending)
+  }
+
   def listRunningApps(): Array[AppMasterData] = {
     listApps().filter(_.status == MasterToAppMaster.AppMasterActive)
   }
@@ -96,7 +101,7 @@ class RestClient(host: String, port: Int) {
 
   def submitApp(jar: String, executorNum: Int, args: String = "", config: 
String = "")
     : Boolean = try {
-    var endpoint = "master/submitapp"
+    val endpoint = "master/submitapp"
 
     var options = Seq(s"jar=@$jar")
     if (config.length > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/services/dashboard/services/models/models.js
----------------------------------------------------------------------
diff --git a/services/dashboard/services/models/models.js 
b/services/dashboard/services/models/models.js
index 8265bac..d0f4515 100644
--- a/services/dashboard/services/models/models.js
+++ b/services/dashboard/services/models/models.js
@@ -138,6 +138,7 @@ angular.module('org.apache.gearpump.models', [])
           return angular.merge(obj, {
             // extra properties
             isRunning: obj.status === 'active',
+            isKilled: obj.status === 'inactive',
             akkaAddr: decoder._akkaAddr(obj.appMasterPath),
             // extra methods
             pageUrl: locator.app(obj.appId, obj.type),

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/services/dashboard/views/apps/apps.js
----------------------------------------------------------------------
diff --git a/services/dashboard/views/apps/apps.js 
b/services/dashboard/views/apps/apps.js
index ad26fd4..f5b6338 100644
--- a/services/dashboard/views/apps/apps.js
+++ b/services/dashboard/views/apps/apps.js
@@ -67,7 +67,8 @@ angular.module('dashboard')
           $stb.datetime('Start 
Time').key('startTime').canSort().styleClass('col-md-1 hidden-sm 
hidden-xs').done(),
           $stb.datetime('Stop 
Time').key('stopTime').canSort().styleClass('col-md-1 hidden-sm 
hidden-xs').done(),
           
$stb.text('User').key('user').canSort().styleClass('col-md-2').done(),
-          // group 3/3 (3-col)
+          // group 3/3 (4-col)
+          $stb.text('Status').key('status').canSort().styleClass('col-md-1 
hidden-sm hidden-xs').done(),
           $stb.button('Actions').key(['view', 'config', 'kill', 
'restart']).styleClass('col-md-3').done()
         ],
         rows: null
@@ -86,6 +87,7 @@ angular.module('dashboard')
               submissionTime: app.submissionTime,
               startTime: app.startTime,
               stopTime: app.finishTime || '-',
+              status: app.status,
               view: {
                 href: app.pageUrl,
                 text: 'Details',
@@ -94,7 +96,7 @@ angular.module('dashboard')
               },
               config: {href: app.configLink, target: '_blank', text: 'Config', 
class: 'btn-xs'},
               kill: {
-                text: 'Kill', class: 'btn-xs', disabled: !app.isRunning,
+                text: 'Kill', class: 'btn-xs', disabled: app.isKilled,
                 click: function () {
                   $dialogs.confirm('Are you sure to kill this application?', 
function () {
                     app.terminate();

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
index 7c08b9b..24c6da8 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -22,8 +22,9 @@ import java.lang.management.ManagementFactory
 
 import akka.actor._
 import org.apache.gearpump._
-import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, 
GetStallingTasks, QueryHistoryMetrics, ShutdownApplication}
-import 
org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterDataDetailRequest, 
ReplayFromTimestampWindowTrailingEdge}
+import org.apache.gearpump.cluster.AppMasterToMaster.ActivateAppMaster
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterActivated, 
AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge}
 import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, 
HistoryMetricsItem, LastFailure}
 import org.apache.gearpump.cluster._
 import org.apache.gearpump.cluster.worker.WorkerId
@@ -35,7 +36,7 @@ import org.apache.gearpump.streaming._
 import org.apache.gearpump.streaming.appmaster.AppMaster._
 import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, 
LatestDAG, ReplaceProcessor}
 import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorInfo, 
GetExecutorInfo}
-import org.apache.gearpump.streaming.appmaster.TaskManager.{FailedToRecover, 
GetTaskList, TaskList}
+import org.apache.gearpump.streaming.appmaster.TaskManager.{ApplicationReady, 
FailedToRecover, GetTaskList, TaskList}
 import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, 
ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
 import org.apache.gearpump.streaming.storage.InMemoryAppStoreOnMaster
 import org.apache.gearpump.streaming.task._
@@ -76,7 +77,7 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
 
   private val store = new InMemoryAppStoreOnMaster(appId, 
appContext.masterProxy)
   private val dagManager = context.actorOf(Props(new 
DagManager(appContext.appId, userConfig, store,
-    Some(getUpdatedDAG()))))
+    Some(getUpdatedDAG))))
 
   private var taskManager: Option[ActorRef] = None
   private var clockService: Option[ActorRef] = None
@@ -102,13 +103,13 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
     status = "Active",
     taskCount = 0,
     tasks = Map.empty[ProcessorId, List[TaskId]],
-    jvmName = ManagementFactory.getRuntimeMXBean().getName()
+    jvmName = ManagementFactory.getRuntimeMXBean.getName
   )
 
   private val historyMetricsService = if (metricsEnabled) {
     // Registers jvm metrics
     Metrics(context.system).register(new JvmMetricsSet(
-      s"app${appId}.executor${APPMASTER_DEFAULT_EXECUTOR_ID}"))
+      s"app$appId.executor$APPMASTER_DEFAULT_EXECUTOR_ID"))
 
     val historyMetricsService = context.actorOf(Props(new 
HistoryMetricsService(
       s"app$appId", getHistoryMetricsConfig)))
@@ -137,6 +138,7 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
   override def receive: Receive = {
     taskMessageHandler orElse
       executorMessageHandler orElse
+      ready orElse
       recover orElse
       appMasterService orElse
       ActorUtil.defaultMsgHandler(self)
@@ -162,7 +164,7 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
     case checkpoint: ReportCheckpointClock =>
       clockService.foreach(_ forward checkpoint)
     case GetDAG =>
-      val task = sender
+      val task = sender()
       getDAG.foreach {
         dag => task ! dag
       }
@@ -255,7 +257,7 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
     case GetLastFailure(_) =>
       sender ! lastFailure
     case get@GetExecutorSummary(executorId) =>
-      val client = sender
+      val client = sender()
       if (executorId == APPMASTER_DEFAULT_EXECUTOR_ID) {
         client ! appMasterExecutorSummary
       } else {
@@ -267,7 +269,7 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
           }
       }
     case query@QueryExecutorConfig(executorId) =>
-      val client = sender
+      val client = sender()
       if (executorId == -1) {
         val systemConfig = context.system.settings.config
         sender ! 
ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
@@ -281,6 +283,13 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
       }
   }
 
+  def ready: Receive = {
+    case ApplicationReady =>
+      masterProxy ! ActivateAppMaster(appId)
+    case AppMasterActivated(appId) =>
+      LOG.info(s"AppMaster for app$appId is activated")
+  }
+
   /** Error handling */
   def recover: Receive = {
     case FailedToRecover(errorMsg) =>
@@ -296,8 +305,8 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
 
   private def getMinClock: Future[TimeStamp] = {
     clockService match {
-      case Some(clockService) =>
-        (clockService ? 
GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock)
+      case Some(service) =>
+        (service ? 
GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock)
       case None =>
         Future.failed(new ServiceNotAvailableException("clock service not 
ready"))
     }
@@ -317,8 +326,8 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
 
   private def getTaskList: Future[TaskList] = {
     taskManager match {
-      case Some(taskManager) =>
-        (taskManager ? GetTaskList).asInstanceOf[Future[TaskList]]
+      case Some(manager) =>
+        (manager ? GetTaskList).asInstanceOf[Future[TaskList]]
       case None =>
         Future.failed(new ServiceNotAvailableException("task manager not 
ready"))
     }
@@ -328,13 +337,13 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
     (dagManager ? GetLatestDAG).asInstanceOf[Future[LatestDAG]].map(_.dag)
   }
 
-  private def getUpdatedDAG(): DAG = {
+  private def getUpdatedDAG: DAG = {
     val dag = DAG(userConfig.getValue[Graph[ProcessorDescription,
       PartitionerDescription]](StreamApplication.DAG).get)
     val updated = dag.processors.map { idAndProcessor =>
       val (id, oldProcessor) = idAndProcessor
       val newProcessor = if (oldProcessor.jar == null) {
-        oldProcessor.copy(jar = appContext.appJar.getOrElse(null))
+        oldProcessor.copy(jar = appContext.appJar.orNull)
       } else {
         oldProcessor
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
index 662418c..b3ee046 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
@@ -120,6 +120,10 @@ private[appmaster] class TaskManager(
     dagManager ! GetLatestDAG
     LOG.info(s"goto state ApplicationReady(dag = ${state.dag.version})...")
 
+    if (state.dag.version >= 0) {
+      appMaster ! ApplicationReady
+    }
+
     val recoverRegistry = new TaskRegistry(expectedTasks = state.dag.tasks,
       deadTasks = state.taskRegistry.deadTasks)
 
@@ -271,7 +275,7 @@ private[appmaster] class TaskManager(
       }
 
     case RegisterTask(taskId, executorId, host) =>
-      val client = sender
+      val client = sender()
       val register = state.taskRegistry
       val status = register.registerTask(taskId, TaskLocation(executorId, 
host))
       if (status == Accept) {
@@ -400,6 +404,8 @@ private[appmaster] object TaskManager {
 
   case class TaskList(tasks: Map[TaskId, ExecutorId])
 
+  case object ApplicationReady
+
   case class FailedToRecover(errorMsg: String)
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index 9765278..89f9c1d 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -33,6 +33,7 @@ import 
org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, Cha
 import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, 
GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange}
 import 
org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorResourceUsageSummary,
 SetTaskManager, StartExecutors, _}
 import 
org.apache.gearpump.streaming.appmaster.JarScheduler.ResourceRequestDetail
+import org.apache.gearpump.streaming.appmaster.TaskManager.ApplicationReady
 import org.apache.gearpump.streaming.appmaster.TaskManagerSpec.{Env, Task1, 
Task2}
 import org.apache.gearpump.streaming.executor.Executor.RestartTasks
 import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _}
@@ -101,7 +102,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
     assert(returned.resources.deep == resourceRequest.deep)
     executorManager.reply(StartExecutorsTimeOut)
 
-    // TaskManager cannot handle the TimeOut error itself, escalate to 
appmaster.
+    // TaskManager cannot handle the TimeOut error itself, escalate to 
AppMaster.
     appMaster.expectMsg(AllocateResourceTimeOut)
   }
 
@@ -166,7 +167,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
     dagManager.expectMsgType[WatchChange]
     executorManager.expectMsgType[SetTaskManager]
 
-    // Step1: first transition from Unitialized to ApplicationReady
+    // Step1: first transition from Uninitialized to ApplicationReady
     executorManager.expectMsgType[ExecutorResourceUsageSummary]
     dagManager.expectMsgType[NewDAGDeployed]
 
@@ -208,11 +209,11 @@ class TaskManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
 
     // Step7: Launch Task
     val launchTaskMatch: PartialFunction[Any, RegisterTask] = {
-      case UniCast(executorId, launch: LaunchTasks) =>
+      case UniCast(_, launch: LaunchTasks) =>
         RegisterTask(launch.taskId.head, executorId, 
HostPort("127.0.0.1:3000"))
     }
 
-    // Taskmanager should return the latest start clock to task(0,0)
+    // TaskManager should return the latest start clock to task(0,0)
     clockService.expectMsg(GetStartClock)
     clockService.reply(StartClock(0))
 
@@ -228,7 +229,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
     // Step9: start broadcasting TaskLocations.
     import scala.concurrent.duration._
     assert(executorManager.expectMsgPF(5.seconds) {
-      case BroadCast(startAllTasks) => 
startAllTasks.isInstanceOf[TaskLocationsReady]
+      case BroadCast(taskLocationsReady) => 
taskLocationsReady.isInstanceOf[TaskLocationsReady]
     })
 
     // Step10: Executor confirm it has received TaskLocationsReceived(version, 
executorId)
@@ -244,11 +245,15 @@ class TaskManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
       case BroadCast(startAllTasks) => 
startAllTasks.isInstanceOf[StartAllTasks]
     })
 
-    // Step13, Tell executor Manager the updated usage status of executors.
+    // Step13, Tell ExecutorManager the updated usage status of executors.
     executorManager.expectMsgType[ExecutorResourceUsageSummary]
 
-    // Step14: transition from DynamicDAG to ApplicationReady
+    // Step14: Tell AppMaster application is ready.
+    appMaster.expectMsg(ApplicationReady)
+
+    // Step15: transition from DynamicDAG to ApplicationReady
     Env(executorManager, clockService, appMaster, executor, taskManager, 
scheduler)
+
   }
 }
 


Reply via email to