Repository: incubator-gearpump Updated Branches: refs/heads/master e78d6cca5 -> ec04cf766
fix GEARPUMP-83, show application pending on worker down Changes include: 1. add `activeAppMasters` set to `masterState` and `deadAppMasters` is changed to set of appIds. Now all existent applications will be in `appMasterRegistry` and in **pending** status by default. Those in `activeAppMasters` will have **active** status and `deadAppMasters` **inactive** status. 2. add a status column to the "apps" page on the dashboard. status will be one of pending, active and inactive. 3. Internally, `TaskManager` will send an `ApplicationReady` message to `AppMaster` on ApplicationReady state(with DAG version larger than -1). In turn, `AppMaster` will send an `ActivateApplication(appId)` message to `Master`(forwarded to `AppManager`). Then the `appId` is added to `activeAppMasters`. It is removed from `activeAppMasters` either when an application is killed or recovered. 4. add UTs and fix a bunch of warnings (typos, suspicious shadowing, etc) 5. fix integration test 6. allow to kill pending applications Author: manuzhang <[email protected]> Closes #31 from manuzhang/appmaster_status. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ec04cf76 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ec04cf76 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ec04cf76 Branch: refs/heads/master Commit: ec04cf7666020ba15764611ac872344549f5200c Parents: e78d6cc Author: manuzhang <[email protected]> Authored: Fri Jun 3 09:22:05 2016 +0800 Committer: manuzhang <[email protected]> Committed: Fri Jun 3 09:22:05 2016 +0800 ---------------------------------------------------------------------- .../gearpump/cluster/ClusterMessage.scala | 10 ++ .../appmaster/MasterConnectionKeeper.scala | 3 +- .../gearpump/cluster/master/AppManager.scala | 152 ++++++++++--------- .../apache/gearpump/cluster/master/Master.scala | 3 +- .../cluster/master/AppManagerSpec.scala | 11 +- .../gearpump/integrationtest/TestSpecBase.scala | 14 +- .../checklist/RestServiceSpec.scala | 6 +- .../minicluster/RestClient.scala | 7 +- services/dashboard/services/models/models.js | 1 + services/dashboard/views/apps/apps.js | 6 +- .../streaming/appmaster/AppMaster.scala | 39 +++-- .../streaming/appmaster/TaskManager.scala | 8 +- .../streaming/appmaster/TaskManagerSpec.scala | 19 ++- 13 files changed, 168 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala index e6ab13b..8aa84b5 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala @@ -162,6 +162,12 @@ trait AppMasterRegisterData object AppMasterToMaster { /** + * Activate the AppMaster when an application is ready to run. + * @param appId application id + */ + case class ActivateAppMaster(appId: Int) + + /** * Register an AppMaster by providing a ActorRef, and registerData * @param registerData The registerData is provided by Master when starting the app master. * App master should return the registerData back to master. @@ -246,10 +252,14 @@ object MasterToAppMaster { /** Master confirm reception of RegisterAppMaster message */ case class AppMasterRegistered(appId: Int) + /** Master confirm reception of ActivateAppMaster message */ + case class AppMasterActivated(appId: Int) + /** Shutdown the application job */ case object ShutdownAppMaster type AppMasterStatus = String + val AppMasterPending: AppMasterStatus = "pending" val AppMasterActive: AppMasterStatus = "active" val AppMasterInActive: AppMasterStatus = "inactive" val AppMasterNonExist: AppMasterStatus = "nonexist" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala index fd2d11a..e3fbe49 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala @@ -46,7 +46,6 @@ class MasterConnectionKeeper( import context.dispatcher private val LOG = LogUtil.getLogger(getClass) - private var master: ActorRef = null // Subscribe self to masterProxy, masterProxy ! WatchMaster(self) @@ -72,7 +71,7 @@ class MasterConnectionKeeper( def masterLivenessListener: Receive = { case MasterRestarted => - LOG.info("Master restarted, re-registering appmaster....") + LOG.info("Master restarted, re-registering AppMaster....") context.become(waitMasterToConfirm(registerAppMaster)) case MasterStopped => LOG.info("Master is dead, killing this AppMaster....") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala index 435b83e..9a3a119 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala @@ -47,7 +47,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch private val LOG: Logger = LogUtil.getLogger(getClass) - private val executorId: Int = APPMASTER_DEFAULT_EXECUTOR_ID + private val EXECUTOR_ID: Int = APPMASTER_DEFAULT_EXECUTOR_ID private val appMasterMaxRetries: Int = 5 private val appMasterRetryTimeRange: Duration = 20.seconds @@ -55,13 +55,17 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch implicit val executionContext = context.dispatcher // Next available appId - private var appId: Int = 1 + private var nextAppId: Int = 1 - // From appid to appMaster data + // From appId to appMaster data + // Applications not in activeAppMasters or deadAppMasters are in pending status private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)] - // Dead appmaster list - private var deadAppMasters = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)] + // Active appMaster list where applications are in active status + private var activeAppMasters = Set.empty[Int] + + // Dead appMaster list where applications are in inactive status + private var deadAppMasters = Set.empty[Int] private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy] @@ -74,7 +78,8 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch case GetKVSuccess(_, result) => val masterState = result.asInstanceOf[MasterState] if (masterState != null) { - this.appId = masterState.maxId + 1 + this.nextAppId = masterState.maxId + 1 + this.activeAppMasters = masterState.activeAppMasters this.deadAppMasters = masterState.deadAppMasters this.appMasterRegistry = masterState.appMasterRegistry } @@ -97,20 +102,20 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch def clientMsgHandler: Receive = { case SubmitApplication(app, jar, username) => - LOG.info(s"Submit Application ${app.name}($appId) by $username...") - val client = sender + LOG.info(s"Submit Application ${app.name}($nextAppId) by $username...") + val client = sender() if (applicationNameExist(app.name)) { client ! SubmitApplicationResult(Failure( new Exception(s"Application name ${app.name} already existed"))) } else { - context.actorOf(launcher.props(appId, executorId, app, jar, username, context.parent, - Some(client)), s"launcher${appId}_${Util.randInt()}") + context.actorOf(launcher.props(nextAppId, EXECUTOR_ID, app, jar, username, context.parent, + Some(client)), s"launcher${nextAppId}_${Util.randInt()}") - val appState = new ApplicationState(appId, app.name, 0, app, jar, username, null) - appMasterRestartPolicies += appId -> + val appState = new ApplicationState(nextAppId, app.name, 0, app, jar, username, null) + appMasterRestartPolicies += nextAppId -> new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange) - kvService ! PutKV(appId.toString, APP_STATE, appState) - appId += 1 + kvService ! PutKV(nextAppId.toString, APP_STATE, appState) + nextAppId += 1 } case RestartApplication(appId) => @@ -136,22 +141,25 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch case ShutdownApplication(appId) => LOG.info(s"App Manager Shutting down application $appId") - val (_, info) = appMasterRegistry.getOrElse(appId, (null, null)) - Option(info) match { + val (_, appInfo) = appMasterRegistry.get(appId) + .filter { case (_, info) => !deadAppMasters.contains(info.appId)} + .getOrElse((null, null)) + Option(appInfo) match { case Some(info) => val worker = info.worker val workerPath = Option(worker).map(_.path).orNull - LOG.info(s"Shutdown AppMaster at ${workerPath}, appId: $appId, executorId: $executorId") + LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, executorId: $EXECUTOR_ID") cleanApplicationData(appId) - val shutdown = ShutdownExecutor(appId, executorId, + val shutdown = ShutdownExecutor(appId, EXECUTOR_ID, s"AppMaster $appId shutdown requested by master...") sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut()) sender ! ShutdownApplicationResult(Success(appId)) case None => - val errorMsg = s"Failed to find regisration information for appId: $appId" + val errorMsg = s"Failed to find registration information for appId: $appId" LOG.error(errorMsg) sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg))) } + case ResolveAppId(appId) => val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null)) if (null != appMaster) { @@ -159,6 +167,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch } else { sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId"))) } + case AppMastersDataRequest => var appMastersData = collection.mutable.ListBuffer[AppMasterData]() appMasterRegistry.foreach(pair => { @@ -166,31 +175,19 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) val workerPath = Option(info.worker).map(worker => ActorUtil.getFullPath(context.system, worker.path)) + val status = getAppMasterStatus(id) appMastersData += AppMasterData( - AppMasterActive, id, info.appName, appMasterPath, workerPath.orNull, - info.submissionTime, info.startTime, info.finishTime, info.user) - }) - - deadAppMasters.foreach(pair => { - val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair - val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) - val workerPath = Option(info.worker).map(worker => - ActorUtil.getFullPath(context.system, worker.path)) - - appMastersData += AppMasterData( - AppMasterInActive, id, info.appName, appMasterPath, workerPath.orNull, + status, id, info.appName, appMasterPath, workerPath.orNull, info.submissionTime, info.startTime, info.finishTime, info.user) }) sender ! AppMastersData(appMastersData.toList) + case QueryAppMasterConfig(appId) => val config = if (appMasterRegistry.contains(appId)) { val (_, info) = appMasterRegistry(appId) info.config - } else if (deadAppMasters.contains(appId)) { - val (_, info) = deadAppMasters(appId) - info.config } else { null } @@ -198,28 +195,19 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch case appMasterDataRequest: AppMasterDataRequest => val appId = appMasterDataRequest.appId - val (appStatus, appMaster, info) = - if (appMasterRegistry.contains(appId)) { - val (appMaster, info) = appMasterRegistry(appId) - (AppMasterActive, appMaster, info) - } else if (deadAppMasters.contains(appId)) { - val (appMaster, info) = deadAppMasters(appId) - (AppMasterInActive, appMaster, info) - } else { - (AppMasterNonExist, null, null) - } + val appStatus = getAppMasterStatus(appId) appStatus match { - case AppMasterActive | AppMasterInActive => + case AppMasterNonExist => + sender ! AppMasterData(AppMasterNonExist) + case _ => + val (appMaster, info) = appMasterRegistry(appId) val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) val workerPath = Option(info.worker).map( worker => ActorUtil.getFullPath(context.system, worker.path)).orNull sender ! AppMasterData( appStatus, appId, info.appName, appMasterPath, workerPath, info.submissionTime, info.startTime, info.finishTime, info.user) - - case AppMasterNonExist => - sender ! AppMasterData(AppMasterNonExist) } } @@ -230,6 +218,18 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch LOG.error(failed.reason) } + private def getAppMasterStatus(appId: Int): AppMasterStatus = { + if (activeAppMasters.contains(appId)) { + AppMasterActive + } else if (deadAppMasters.contains(appId)) { + AppMasterInActive + } else if (appMasterRegistry.contains(appId)) { + AppMasterPending + } else { + AppMasterNonExist + } + } + private def shutDownExecutorTimeOut(): Unit = { LOG.error(s"Shut down executor time out") } @@ -239,17 +239,24 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch val startTime = System.currentTimeMillis() val register = registerBack.copy(startTime = startTime) - LOG.info(s"Register AppMaster for app: ${register.appId} $register") + LOG.info(s"Register AppMaster for app: ${register.appId}, $register") context.watch(appMaster) appMasterRegistry += register.appId -> (appMaster, register) kvService ! PutKV(MASTER_GROUP, MASTER_STATE, - MasterState(appId, appMasterRegistry, deadAppMasters)) + MasterState(nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) sender ! AppMasterRegistered(register.appId) + + case ActivateAppMaster(appId) => + LOG.info(s"Activate AppMaster for app $appId") + activeAppMasters += appId + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, + MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) + sender ! AppMasterActivated(appId) } def appDataStoreService: Receive = { case SaveAppData(appId, key, value) => - val client = sender + val client = sender() (kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map { case PutKVSuccess => client ! AppDataSaved @@ -257,7 +264,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch client ! SaveAppDataFailed } case GetAppData(appId, key) => - val client = sender + val client = sender() (kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map { case GetKVSuccess(privateKey, value) => client ! GetAppDataResult(key, value) @@ -268,9 +275,8 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch def terminationWatch: Receive = { case terminate: Terminated => - terminate.getAddressTerminated() - LOG.info(s"AppMaster(${terminate.actor.path}) is terminiated, " + - s"network down: ${terminate.getAddressTerminated()}") + LOG.info(s"AppMaster(${terminate.actor.path}) is terminated, " + + s"network down: ${terminate.getAddressTerminated}") // Now we assume that the only normal way to stop the application is submitting a // ShutdownApplication request @@ -300,7 +306,10 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch val appId = state.appId if (appMasterRestartPolicies.get(appId).get.allowRestart) { LOG.info(s"AppManager Recovering Application $appId...") - context.actorOf(launcher.props(appId, executorId, state.app, state.jar, state.username, + activeAppMasters -= appId + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, + MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) + context.actorOf(launcher.props(appId, EXECUTOR_ID, state.app, state.jar, state.username, context.parent, None), s"launcher${appId}_${Util.randInt()}") } else { LOG.error(s"Application $appId failed too many times") @@ -310,22 +319,26 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch case class RecoverApplication(applicationStatus: ApplicationState) private def cleanApplicationData(appId: Int): Unit = { - // Add the dead app to dead appMaster - appMasterRegistry.get(appId).foreach { pair => - val (appMasterActor, info) = pair - deadAppMasters += appId -> (appMasterActor, info.copy( - finishTime = System.currentTimeMillis())) + if (appMasterRegistry.contains(appId)) { + // Add the dead app to dead appMasters + deadAppMasters += appId + // Remove the dead app from active appMasters + activeAppMasters -= appId + + appMasterRegistry += appId -> { + val (ref, info) = appMasterRegistry(appId) + (ref, info.copy(finishTime = System.currentTimeMillis())) + } + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, + MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) + kvService ! DeleteKVGroup(appId.toString) } - - appMasterRegistry -= appId - - kvService ! PutKV(MASTER_GROUP, MASTER_STATE, - MasterState(this.appId, appMasterRegistry, deadAppMasters)) - kvService ! DeleteKVGroup(appId.toString) } private def applicationNameExist(appName: String): Boolean = { - appMasterRegistry.values.exists(_._2.appName == appName) + appMasterRegistry.values.exists { case (_, info) => + info.appName == appName && !deadAppMasters.contains(info.appId) + } } } @@ -337,5 +350,6 @@ object AppManager { case class MasterState( maxId: Int, appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)], - deadAppMasters: Map[Int, (ActorRef, AppMasterRuntimeInfo)]) + activeAppMasters: Set[Int], + deadAppMasters: Set[Int]) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala index 1536a23..762cf27 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala @@ -187,8 +187,9 @@ private[cluster] class Master extends Actor with Stash { case request: RequestResource => scheduler forward request case registerAppMaster: RegisterAppMaster => - // Forward to appManager appManager forward registerAppMaster + case activateAppMaster: ActivateAppMaster => + appManager forward activateAppMaster case save: SaveAppData => appManager forward save case get: GetAppData => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala index ae0ebcd..58e3593 100644 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala +++ b/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala @@ -52,20 +52,23 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref, new DummyAppMasterLauncherFactory(appLauncher)))) kvService.expectMsgType[GetKV] - kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Map.empty))) + kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty))) } override def afterEach(): Unit = { shutdownActorSystem() } - "AppManager" should "handle appmaster message correctly" in { + "AppManager" should "handle AppMaster message correctly" in { val appMaster = TestProbe()(getActorSystem) - val worker = TestProbe()(getActorSystem) + val appId = 1 - val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(0, "appName")) + val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName")) appMaster.send(appManager, register) appMaster.expectMsgType[AppMasterRegistered] + + appMaster.send(appManager, ActivateAppMaster(appId)) + appMaster.expectMsgType[AppMasterActivated] } "DataStoreService" should "support Put and Get" in { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala index dabcc71..f480475 100644 --- a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala @@ -63,7 +63,7 @@ trait TestSpecBase assert(cluster != null, "Configure MiniCluster properly in suite spec") cluster.isAlive shouldBe true - restClient.listRunningApps().isEmpty shouldBe true + restClient.listPendingOrRunningApps().isEmpty shouldBe true LOGGER.debug(s">### =============================================================") LOGGER.debug(s">###2 Start test: ${td.name}\n") } @@ -77,16 +77,16 @@ trait TestSpecBase LOGGER.info("Will restart the cluster for next test case") cluster.restart() } else { - restClient.listRunningApps().foreach(app => { + restClient.listPendingOrRunningApps().foreach(app => { commandLineClient.killApp(app.appId) shouldBe true }) } } - def expectAppIsRunning(appId: Int, expectedAppName: String): AppMasterData = { - val app = restClient.queryApp(appId) - app.status shouldEqual MasterToAppMaster.AppMasterActive - app.appName shouldEqual expectedAppName - app + def expectAppIsRunning(appId: Int, expectedAppName: String): Unit = { + Util.retryUntil(() => { + val app = restClient.queryApp(appId) + app.status == MasterToAppMaster.AppMasterActive && app.appName == expectedAppName + }, s"$expectedAppName is running") } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala index 2f5bb64..992533b 100644 --- a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala @@ -162,7 +162,7 @@ class RestServiceSpec extends TestSpecBase { killAppAndVerify(appId) } - "should fail when attempting to kill a stopped application" in { + "fail when attempting to kill a stopped application" in { // setup val appId = restClient.getNextAvailableAppId() val submissionSucess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) @@ -175,7 +175,7 @@ class RestServiceSpec extends TestSpecBase { success shouldBe false } - "should fail when attempting to kill a non-exist application" in { + "fail when attempting to kill a non-exist application" in { // setup val freeAppId = restClient.listApps().length + 1 @@ -342,6 +342,8 @@ class RestServiceSpec extends TestSpecBase { val killedApp = restClient.queryApp(originAppId) killedApp.appId shouldEqual originAppId killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive + val newAppId = originAppId + 1 + expectAppIsRunning(newAppId, wordCountName) val runningApps = restClient.listRunningApps() runningApps.length shouldEqual 1 val newAppDetail = restClient.queryStreamingAppDetail(runningApps.head.appId) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala index 1b143af..8fa0679 100644 --- a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala @@ -86,6 +86,11 @@ class RestClient(host: String, port: Int) { decodeAs[AppMastersData](resp).appMasters.toArray } + def listPendingOrRunningApps(): Array[AppMasterData] = { + listApps().filter(app => app.status == MasterToAppMaster.AppMasterActive + || app.status == MasterToAppMaster.AppMasterPending) + } + def listRunningApps(): Array[AppMasterData] = { listApps().filter(_.status == MasterToAppMaster.AppMasterActive) } @@ -96,7 +101,7 @@ class RestClient(host: String, port: Int) { def submitApp(jar: String, executorNum: Int, args: String = "", config: String = "") : Boolean = try { - var endpoint = "master/submitapp" + val endpoint = "master/submitapp" var options = Seq(s"jar=@$jar") if (config.length > 0) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/services/dashboard/services/models/models.js ---------------------------------------------------------------------- diff --git a/services/dashboard/services/models/models.js b/services/dashboard/services/models/models.js index 8265bac..d0f4515 100644 --- a/services/dashboard/services/models/models.js +++ b/services/dashboard/services/models/models.js @@ -138,6 +138,7 @@ angular.module('org.apache.gearpump.models', []) return angular.merge(obj, { // extra properties isRunning: obj.status === 'active', + isKilled: obj.status === 'inactive', akkaAddr: decoder._akkaAddr(obj.appMasterPath), // extra methods pageUrl: locator.app(obj.appId, obj.type), http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/services/dashboard/views/apps/apps.js ---------------------------------------------------------------------- diff --git a/services/dashboard/views/apps/apps.js b/services/dashboard/views/apps/apps.js index ad26fd4..f5b6338 100644 --- a/services/dashboard/views/apps/apps.js +++ b/services/dashboard/views/apps/apps.js @@ -67,7 +67,8 @@ angular.module('dashboard') $stb.datetime('Start Time').key('startTime').canSort().styleClass('col-md-1 hidden-sm hidden-xs').done(), $stb.datetime('Stop Time').key('stopTime').canSort().styleClass('col-md-1 hidden-sm hidden-xs').done(), $stb.text('User').key('user').canSort().styleClass('col-md-2').done(), - // group 3/3 (3-col) + // group 3/3 (4-col) + $stb.text('Status').key('status').canSort().styleClass('col-md-1 hidden-sm hidden-xs').done(), $stb.button('Actions').key(['view', 'config', 'kill', 'restart']).styleClass('col-md-3').done() ], rows: null @@ -86,6 +87,7 @@ angular.module('dashboard') submissionTime: app.submissionTime, startTime: app.startTime, stopTime: app.finishTime || '-', + status: app.status, view: { href: app.pageUrl, text: 'Details', @@ -94,7 +96,7 @@ angular.module('dashboard') }, config: {href: app.configLink, target: '_blank', text: 'Config', class: 'btn-xs'}, kill: { - text: 'Kill', class: 'btn-xs', disabled: !app.isRunning, + text: 'Kill', class: 'btn-xs', disabled: app.isKilled, click: function () { $dialogs.confirm('Are you sure to kill this application?', function () { app.terminate(); http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala index 7c08b9b..24c6da8 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala @@ -22,8 +22,9 @@ import java.lang.management.ManagementFactory import akka.actor._ import org.apache.gearpump._ -import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, GetStallingTasks, QueryHistoryMetrics, ShutdownApplication} -import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge} +import org.apache.gearpump.cluster.AppMasterToMaster.ActivateAppMaster +import org.apache.gearpump.cluster.ClientToMaster._ +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterActivated, AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge} import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, LastFailure} import org.apache.gearpump.cluster._ import org.apache.gearpump.cluster.worker.WorkerId @@ -35,7 +36,7 @@ import org.apache.gearpump.streaming._ import org.apache.gearpump.streaming.appmaster.AppMaster._ import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, LatestDAG, ReplaceProcessor} import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorInfo, GetExecutorInfo} -import org.apache.gearpump.streaming.appmaster.TaskManager.{FailedToRecover, GetTaskList, TaskList} +import org.apache.gearpump.streaming.appmaster.TaskManager.{ApplicationReady, FailedToRecover, GetTaskList, TaskList} import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} import org.apache.gearpump.streaming.storage.InMemoryAppStoreOnMaster import org.apache.gearpump.streaming.task._ @@ -76,7 +77,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli private val store = new InMemoryAppStoreOnMaster(appId, appContext.masterProxy) private val dagManager = context.actorOf(Props(new DagManager(appContext.appId, userConfig, store, - Some(getUpdatedDAG())))) + Some(getUpdatedDAG)))) private var taskManager: Option[ActorRef] = None private var clockService: Option[ActorRef] = None @@ -102,13 +103,13 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli status = "Active", taskCount = 0, tasks = Map.empty[ProcessorId, List[TaskId]], - jvmName = ManagementFactory.getRuntimeMXBean().getName() + jvmName = ManagementFactory.getRuntimeMXBean.getName ) private val historyMetricsService = if (metricsEnabled) { // Registers jvm metrics Metrics(context.system).register(new JvmMetricsSet( - s"app${appId}.executor${APPMASTER_DEFAULT_EXECUTOR_ID}")) + s"app$appId.executor$APPMASTER_DEFAULT_EXECUTOR_ID")) val historyMetricsService = context.actorOf(Props(new HistoryMetricsService( s"app$appId", getHistoryMetricsConfig))) @@ -137,6 +138,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli override def receive: Receive = { taskMessageHandler orElse executorMessageHandler orElse + ready orElse recover orElse appMasterService orElse ActorUtil.defaultMsgHandler(self) @@ -162,7 +164,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli case checkpoint: ReportCheckpointClock => clockService.foreach(_ forward checkpoint) case GetDAG => - val task = sender + val task = sender() getDAG.foreach { dag => task ! dag } @@ -255,7 +257,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli case GetLastFailure(_) => sender ! lastFailure case get@GetExecutorSummary(executorId) => - val client = sender + val client = sender() if (executorId == APPMASTER_DEFAULT_EXECUTOR_ID) { client ! appMasterExecutorSummary } else { @@ -267,7 +269,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli } } case query@QueryExecutorConfig(executorId) => - val client = sender + val client = sender() if (executorId == -1) { val systemConfig = context.system.settings.config sender ! ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) @@ -281,6 +283,13 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli } } + def ready: Receive = { + case ApplicationReady => + masterProxy ! ActivateAppMaster(appId) + case AppMasterActivated(appId) => + LOG.info(s"AppMaster for app$appId is activated") + } + /** Error handling */ def recover: Receive = { case FailedToRecover(errorMsg) => @@ -296,8 +305,8 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli private def getMinClock: Future[TimeStamp] = { clockService match { - case Some(clockService) => - (clockService ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock) + case Some(service) => + (service ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock) case None => Future.failed(new ServiceNotAvailableException("clock service not ready")) } @@ -317,8 +326,8 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli private def getTaskList: Future[TaskList] = { taskManager match { - case Some(taskManager) => - (taskManager ? GetTaskList).asInstanceOf[Future[TaskList]] + case Some(manager) => + (manager ? GetTaskList).asInstanceOf[Future[TaskList]] case None => Future.failed(new ServiceNotAvailableException("task manager not ready")) } @@ -328,13 +337,13 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli (dagManager ? GetLatestDAG).asInstanceOf[Future[LatestDAG]].map(_.dag) } - private def getUpdatedDAG(): DAG = { + private def getUpdatedDAG: DAG = { val dag = DAG(userConfig.getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get) val updated = dag.processors.map { idAndProcessor => val (id, oldProcessor) = idAndProcessor val newProcessor = if (oldProcessor.jar == null) { - oldProcessor.copy(jar = appContext.appJar.getOrElse(null)) + oldProcessor.copy(jar = appContext.appJar.orNull) } else { oldProcessor } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala index 662418c..b3ee046 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala @@ -120,6 +120,10 @@ private[appmaster] class TaskManager( dagManager ! GetLatestDAG LOG.info(s"goto state ApplicationReady(dag = ${state.dag.version})...") + if (state.dag.version >= 0) { + appMaster ! ApplicationReady + } + val recoverRegistry = new TaskRegistry(expectedTasks = state.dag.tasks, deadTasks = state.taskRegistry.deadTasks) @@ -271,7 +275,7 @@ private[appmaster] class TaskManager( } case RegisterTask(taskId, executorId, host) => - val client = sender + val client = sender() val register = state.taskRegistry val status = register.registerTask(taskId, TaskLocation(executorId, host)) if (status == Accept) { @@ -400,6 +404,8 @@ private[appmaster] object TaskManager { case class TaskList(tasks: Map[TaskId, ExecutorId]) + case object ApplicationReady + case class FailedToRecover(errorMsg: String) /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec04cf76/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala index 9765278..89f9c1d 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -33,6 +33,7 @@ import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, Cha import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange} import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorResourceUsageSummary, SetTaskManager, StartExecutors, _} import org.apache.gearpump.streaming.appmaster.JarScheduler.ResourceRequestDetail +import org.apache.gearpump.streaming.appmaster.TaskManager.ApplicationReady import org.apache.gearpump.streaming.appmaster.TaskManagerSpec.{Env, Task1, Task2} import org.apache.gearpump.streaming.executor.Executor.RestartTasks import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _} @@ -101,7 +102,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { assert(returned.resources.deep == resourceRequest.deep) executorManager.reply(StartExecutorsTimeOut) - // TaskManager cannot handle the TimeOut error itself, escalate to appmaster. + // TaskManager cannot handle the TimeOut error itself, escalate to AppMaster. appMaster.expectMsg(AllocateResourceTimeOut) } @@ -166,7 +167,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { dagManager.expectMsgType[WatchChange] executorManager.expectMsgType[SetTaskManager] - // Step1: first transition from Unitialized to ApplicationReady + // Step1: first transition from Uninitialized to ApplicationReady executorManager.expectMsgType[ExecutorResourceUsageSummary] dagManager.expectMsgType[NewDAGDeployed] @@ -208,11 +209,11 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { // Step7: Launch Task val launchTaskMatch: PartialFunction[Any, RegisterTask] = { - case UniCast(executorId, launch: LaunchTasks) => + case UniCast(_, launch: LaunchTasks) => RegisterTask(launch.taskId.head, executorId, HostPort("127.0.0.1:3000")) } - // Taskmanager should return the latest start clock to task(0,0) + // TaskManager should return the latest start clock to task(0,0) clockService.expectMsg(GetStartClock) clockService.reply(StartClock(0)) @@ -228,7 +229,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { // Step9: start broadcasting TaskLocations. import scala.concurrent.duration._ assert(executorManager.expectMsgPF(5.seconds) { - case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[TaskLocationsReady] + case BroadCast(taskLocationsReady) => taskLocationsReady.isInstanceOf[TaskLocationsReady] }) // Step10: Executor confirm it has received TaskLocationsReceived(version, executorId) @@ -244,11 +245,15 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[StartAllTasks] }) - // Step13, Tell executor Manager the updated usage status of executors. + // Step13, Tell ExecutorManager the updated usage status of executors. executorManager.expectMsgType[ExecutorResourceUsageSummary] - // Step14: transition from DynamicDAG to ApplicationReady + // Step14: Tell AppMaster application is ready. + appMaster.expectMsg(ApplicationReady) + + // Step15: transition from DynamicDAG to ApplicationReady Env(executorManager, clockService, appMaster, executor, taskManager, scheduler) + } }
