[GEARPUMP-252] Return application status to client Author: huafengw <[email protected]>
Closes #134 from huafengw/fix265. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/96312a2a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/96312a2a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/96312a2a Branch: refs/heads/master Commit: 96312a2acaa547ba486fb2a827eeaf87da0cb271 Parents: 2913a1f Author: huafengw <[email protected]> Authored: Tue Jan 24 12:17:19 2017 +0800 Committer: manuzhang <[email protected]> Committed: Tue Jan 24 12:17:31 2017 +0800 ---------------------------------------------------------------------- .../gearpump/cluster/AppDescription.scala | 37 ++- .../gearpump/cluster/ClusterMessage.scala | 58 ++-- .../appmaster/AppMasterRuntimeEnvironment.scala | 26 +- .../appmaster/AppMasterRuntimeInfo.scala | 38 --- .../cluster/appmaster/ApplicationMetaData.scala | 30 ++ .../appmaster/ApplicationRuntimeInfo.scala | 52 ++++ .../cluster/appmaster/ApplicationState.scala | 47 ---- .../cluster/client/RunningApplication.scala | 34 ++- .../gearpump/cluster/master/AppManager.scala | 271 ++++++++++--------- .../cluster/master/AppMasterLauncher.scala | 40 +-- .../apache/gearpump/cluster/master/Master.scala | 6 +- .../org/apache/gearpump/util/ActorUtil.scala | 15 +- .../cluster/appmaster/AppManagerSpec.scala | 37 ++- .../AppMasterRuntimeEnvironmentSpec.scala | 2 +- .../appmaster/MasterConnectionKeeperSpec.scala | 2 +- .../apache/gearpump/cluster/main/MainSpec.scala | 5 +- .../cluster/master/ApplicationStateSpec.scala | 11 +- .../DistShellAppMasterSpec.scala | 10 +- .../DistServiceAppMasterSpec.scala | 9 +- .../experiments/storm/main/GearpumpNimbus.scala | 9 +- .../gearpump/integrationtest/TestSpecBase.scala | 5 +- .../checklist/CommandLineSpec.scala | 6 +- .../checklist/RestServiceSpec.scala | 9 +- .../checklist/StabilitySpec.scala | 9 +- .../minicluster/CommandLineClient.scala | 11 +- .../minicluster/RestClient.scala | 11 +- .../gearpump/services/util/UpickleUtil.scala | 21 +- .../services/AppMasterServiceSpec.scala | 6 +- .../streaming/appmaster/AppMaster.scala | 20 +- .../appmaster/StreamAppMasterSummary.scala | 5 +- .../gearpump/streaming/StreamingTestUtil.scala | 6 +- .../streaming/appmaster/AppMasterSpec.scala | 19 +- .../appmaster/ExecutorManagerSpec.scala | 2 +- 33 files changed, 472 insertions(+), 397 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala index 91c2675..c31f01f 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala @@ -18,15 +18,16 @@ package org.apache.gearpump.cluster -import scala.reflect.ClassTag +import java.io.Serializable import akka.actor.{Actor, ActorRef, ActorSystem} import com.typesafe.config.{Config, ConfigFactory} - import org.apache.gearpump.cluster.appmaster.WorkerInfo import org.apache.gearpump.cluster.scheduler.Resource import org.apache.gearpump.jarstore.FilePath +import scala.reflect.ClassTag + /** * This contains all information to run an application * @@ -38,8 +39,7 @@ import org.apache.gearpump.jarstore.FilePath * really need to change it, please use ClusterConfigSource(filePath) to * construct the object, while filePath points to the .conf file. */ -case class AppDescription( - name: String, appMaster: String, userConfig: UserConfig, +case class AppDescription(name: String, appMaster: String, userConfig: UserConfig, clusterConfig: Config = ConfigFactory.empty()) /** @@ -96,8 +96,6 @@ abstract class ApplicationMaster extends Actor * @param appJar application Jar. If the jar is already in classpath, then it can be None. * @param masterProxy The proxy to master actor, it bridges the messages between appmaster * and master - * @param registerData AppMaster are required to send this data to Master by when doing - * RegisterAppMaster. */ case class AppMasterContext( appId: Int, @@ -105,8 +103,7 @@ case class AppMasterContext( resource: Resource, workerInfo: WorkerInfo, appJar: Option[AppJar], - masterProxy: ActorRef, - registerData: AppMasterRegisterData) + masterProxy: ActorRef) /** * Jar file container in the cluster @@ -142,4 +139,26 @@ case class ExecutorContext( case class ExecutorJVMConfig( classPath: Array[String], jvmArguments: Array[String], mainClass: String, arguments: Array[String], jar: Option[AppJar], username: String, - executorAkkaConfig: Config = ConfigFactory.empty()) \ No newline at end of file + executorAkkaConfig: Config = ConfigFactory.empty()) + +sealed abstract class ApplicationStatus(val status: String) + extends Serializable{ + override def toString: String = status +} + +sealed abstract class ApplicationTerminalStatus(override val status: String) + extends ApplicationStatus(status) + +object ApplicationStatus { + case object PENDING extends ApplicationStatus("pending") + + case object ACTIVE extends ApplicationStatus("active") + + case object SUCCEEDED extends ApplicationTerminalStatus("succeeded") + + case object FAILED extends ApplicationTerminalStatus("failed") + + case object TERMINATED extends ApplicationTerminalStatus("terminated") + + case object NONEXIST extends ApplicationStatus("nonexist") +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala index 8aa84b5..73e0649 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala @@ -18,16 +18,14 @@ package org.apache.gearpump.cluster -import org.apache.gearpump.cluster.worker.{WorkerSummary, WorkerId} +import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} import scala.util.Try - import akka.actor.ActorRef import com.typesafe.config.Config - import org.apache.gearpump.TimeStamp -import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterStatus -import org.apache.gearpump.cluster.master.{MasterNode, MasterSummary} +import org.apache.gearpump.cluster.appmaster.WorkerInfo +import org.apache.gearpump.cluster.master.MasterSummary import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} import org.apache.gearpump.metrics.Metrics.MetricType @@ -114,6 +112,11 @@ object ClientToMaster { * Request app master for a short list of cluster app that administrators should be aware of. */ case class GetLastFailure(appId: Int) + + /** + * Register a client to wait application's result + */ + case class RegisterAppResultListener(appId: Int) } object MasterToClient { @@ -155,26 +158,20 @@ object MasterToClient { /** Return the last error of this streaming application job */ case class LastFailure(time: TimeStamp, error: String) -} -trait AppMasterRegisterData + sealed trait ApplicationResult -object AppMasterToMaster { + case class ApplicationSucceeded(appId: Int) extends ApplicationResult - /** - * Activate the AppMaster when an application is ready to run. - * @param appId application id - */ - case class ActivateAppMaster(appId: Int) + case class ApplicationFailed(appId: Int, error: Throwable) extends ApplicationResult +} + +object AppMasterToMaster { /** - * Register an AppMaster by providing a ActorRef, and registerData - * @param registerData The registerData is provided by Master when starting the app master. - * App master should return the registerData back to master. - * Typically registerData hold some context information for this app Master. + * Register an AppMaster by providing a ActorRef, and workerInfo which is running on */ - - case class RegisterAppMaster(appMaster: ActorRef, registerData: AppMasterRegisterData) + case class RegisterAppMaster(appId: Int, appMaster: ActorRef, workerInfo: WorkerInfo) case class InvalidAppMaster(appId: Int, appMaster: String, reason: Throwable) @@ -210,7 +207,7 @@ object AppMasterToMaster { def appId: Int def appName: String def actorPath: String - def status: AppMasterStatus + def status: ApplicationStatus def startTime: TimeStamp def uptime: TimeStamp def user: String @@ -222,7 +219,7 @@ object AppMasterToMaster { appType: String = "general", appName: String = null, actorPath: String = null, - status: AppMasterStatus = MasterToAppMaster.AppMasterActive, + status: ApplicationStatus = ApplicationStatus.ACTIVE, startTime: TimeStamp = 0L, uptime: TimeStamp = 0L, user: String = null) @@ -242,6 +239,12 @@ object AppMasterToMaster { /** Response to GetMasterData */ case class MasterData(masterDescription: MasterSummary) + + /** + * Denotes the application state change of an app. + */ + case class ApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus, + timeStamp: TimeStamp, error: Throwable) } object MasterToAppMaster { @@ -258,17 +261,10 @@ object MasterToAppMaster { /** Shutdown the application job */ case object ShutdownAppMaster - type AppMasterStatus = String - val AppMasterPending: AppMasterStatus = "pending" - val AppMasterActive: AppMasterStatus = "active" - val AppMasterInActive: AppMasterStatus = "inactive" - val AppMasterNonExist: AppMasterStatus = "nonexist" - sealed trait StreamingType - case class AppMasterData( - status: AppMasterStatus, appId: Int = 0, appName: String = null, appMasterPath: String = null, - workerPath: String = null, submissionTime: TimeStamp = 0, startTime: TimeStamp = 0, - finishTime: TimeStamp = 0, user: String = null) + case class AppMasterData(status: ApplicationStatus, appId: Int = 0, appName: String = null, + appMasterPath: String = null, workerPath: String = null, submissionTime: TimeStamp = 0, + startTime: TimeStamp = 0, finishTime: TimeStamp = 0, user: String = null) case class AppMasterDataRequest(appId: Int, detail: Boolean = false) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala index 170e56a..946a4ae 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala @@ -28,6 +28,8 @@ import org.apache.gearpump.cluster.master.MasterProxy import org.apache.gearpump.cluster.{AppDescription, AppMasterContext} import org.apache.gearpump.util.LogUtil +import scala.concurrent.duration._ + /** * This serves as runtime environment for AppMaster. * When starting an AppMaster, we need to setup the connection to master, @@ -50,11 +52,9 @@ class AppMasterRuntimeEnvironment( masterConnectionKeeperFactory: (MasterActorRef, RegisterAppMaster, ListenerActorRef) => Props) extends Actor { - val appId = appContextInput.appId + private val appId = appContextInput.appId private val LOG = LogUtil.getLogger(getClass, app = appId) - import scala.concurrent.duration._ - private val master = context.actorOf( masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 30.seconds))))) private val appContext = appContextInput.copy(masterProxy = master) @@ -63,24 +63,25 @@ class AppMasterRuntimeEnvironment( private val appMaster = context.actorOf(appMasterFactory(appContext, app)) context.watch(appMaster) - private val registerAppMaster = RegisterAppMaster(appMaster, appContext.registerData) + private val registerAppMaster = RegisterAppMaster(appId, appMaster, appContext.workerInfo) + private val masterConnectionKeeper = context.actorOf( masterConnectionKeeperFactory(master, registerAppMaster, self)) context.watch(masterConnectionKeeper) def receive: Receive = { case MasterConnected => - LOG.info(s"Master is connected, start AppMaster ${appId}...") + LOG.info(s"Master is connected, start AppMaster $appId...") appMaster ! StartAppMaster case MasterStopped => - LOG.error(s"Master is stopped, stop AppMaster ${appId}...") + LOG.error(s"Master is stopped, stop AppMaster $appId...") context.stop(self) case Terminated(actor) => actor match { case `appMaster` => - LOG.error(s"AppMaster ${appId} is stopped, shutdown myself") + LOG.error(s"AppMaster $appId is stopped, shutdown myself") context.stop(self) case `masterConnectionKeeper` => - LOG.error(s"Master connection keeper is stopped, appId: ${appId}, shutdown myself") + LOG.error(s"Master connection keeper is stopped, appId: $appId, shutdown myself") context.stop(self) case _ => // Skip } @@ -89,9 +90,8 @@ class AppMasterRuntimeEnvironment( object AppMasterRuntimeEnvironment { - def props( - masters: Iterable[ActorPath], app: AppDescription, appContextInput: AppMasterContext) - : Props = { + def props(masters: Iterable[ActorPath], app: AppDescription, appContextInput: AppMasterContext + ): Props = { val master = (appId: AppId, masterProxy: MasterActorRef) => MasterWithExecutorSystemProvider.props(appId, masterProxy) @@ -103,8 +103,8 @@ object AppMasterRuntimeEnvironment { RegisterAppMaster, listener: ListenerActorRef) => Props(new MasterConnectionKeeper( registerAppMaster, master, masterStatusListener = listener)) - Props(new AppMasterRuntimeEnvironment( - appContextInput, app, masters, master, appMaster, masterConnectionKeeper)) + Props(new AppMasterRuntimeEnvironment(appContextInput, app, masters, + master, appMaster, masterConnectionKeeper)) } /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala deleted file mode 100644 index b3ec88c..0000000 --- a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.appmaster - -import akka.actor.ActorRef -import com.typesafe.config.Config - -import org.apache.gearpump._ -import org.apache.gearpump.cluster.AppMasterRegisterData - -/** Run time info used to start an AppMaster */ -case class AppMasterRuntimeInfo( - appId: Int, - // AppName is the unique Id for an application - appName: String, - worker: ActorRef = null, - user: String = null, - submissionTime: TimeStamp = 0, - startTime: TimeStamp = 0, - finishTime: TimeStamp = 0, - config: Config = null) - extends AppMasterRegisterData http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala new file mode 100644 index 0000000..b011a0d --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import org.apache.gearpump.cluster.{AppDescription, AppJar} +import akka.routing.MurmurHash._ + +/** + * The meta data of an application, which stores the crucial infomation of how to launch + * the application, like the application jar file location. This data is distributed + * across the masters. + */ +case class ApplicationMetaData(appId: Int, attemptId: Int, appDesc: AppDescription, + jar: Option[AppJar], username: String) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala new file mode 100644 index 0000000..d9b73e2 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import akka.actor.ActorRef +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.cluster.{ApplicationStatus, ApplicationTerminalStatus} + +/** Run time info of Application */ +case class ApplicationRuntimeInfo( + appId: Int, + // AppName is the unique Id for an application + appName: String, + appMaster: ActorRef = ActorRef.noSender, + worker: ActorRef = ActorRef.noSender, + user: String = "", + submissionTime: TimeStamp = 0, + startTime: TimeStamp = 0, + finishTime: TimeStamp = 0, + config: Config = ConfigFactory.empty(), + status: ApplicationStatus = ApplicationStatus.NONEXIST) { + + def onAppMasterRegistered(appMaster: ActorRef, worker: ActorRef): ApplicationRuntimeInfo = { + this.copy(appMaster = appMaster, worker = worker) + } + + def onAppMasterActivated(timeStamp: TimeStamp): ApplicationRuntimeInfo = { + this.copy(startTime = timeStamp, status = ApplicationStatus.ACTIVE) + } + + def onFinalStatus(timeStamp: TimeStamp, finalStatus: ApplicationTerminalStatus): + ApplicationRuntimeInfo = { + this.copy(finishTime = timeStamp, status = finalStatus) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala deleted file mode 100644 index 7240113..0000000 --- a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.appmaster - -import org.apache.gearpump.cluster.{AppDescription, AppJar} - -/** - * This state for single application, it is be distributed across the masters. - */ -case class ApplicationState( - appId: Int, appName: String, attemptId: Int, app: AppDescription, jar: Option[AppJar], - username: String, state: Any) extends Serializable { - - override def equals(other: Any): Boolean = { - other match { - case that: ApplicationState => - if (appId == that.appId && attemptId == that.attemptId) { - true - } else { - false - } - case _ => - false - } - } - - override def hashCode: Int = { - import akka.routing.MurmurHash._ - extendHash(appId, attemptId, startMagicA, startMagicB) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala index 153c824..973e1e8 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala @@ -17,16 +17,19 @@ */ package org.apache.gearpump.cluster.client +import akka.actor.{ActorRef, ActorSystem} import akka.pattern.ask -import akka.actor.ActorRef import akka.util.Timeout -import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication} -import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult} -import org.apache.gearpump.util.ActorUtil +import org.apache.gearpump.cluster.ClientToMaster.{RegisterAppResultListener, ResolveAppId, ShutdownApplication} +import org.apache.gearpump.cluster.MasterToClient._ +import org.apache.gearpump.cluster.client.RunningApplication._ +import org.apache.gearpump.util.{ActorUtil, LogUtil} +import org.slf4j.Logger +import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future +import scala.concurrent.duration._ import scala.util.{Failure, Success} -import scala.concurrent.ExecutionContext.Implicits.global class RunningApplication(val appId: Int, master: ActorRef, timeout: Timeout) { lazy val appMaster: Future[ActorRef] = resolveAppMaster(appId) @@ -40,6 +43,21 @@ class RunningApplication(val appId: Int, master: ActorRef, timeout: Timeout) { } } + /** + * This funtion will block until the application finished or failed. + * If failed, an exception will be thrown out + */ + def waitUntilFinish(): Unit = { + val result = ActorUtil.askActor[ApplicationResult](master, + RegisterAppResultListener(appId), INF_TIMEOUT) + result match { + case failed: ApplicationFailed => + throw failed.error + case _ => + LOG.info(s"Application $appId succeeded") + } + } + def askAppMaster[T](msg: Any): Future[T] = { appMaster.flatMap(_.ask(msg)(timeout).asInstanceOf[Future[T]]) } @@ -50,3 +68,9 @@ class RunningApplication(val appId: Int, master: ActorRef, timeout: Timeout) { } } +object RunningApplication { + private val LOG: Logger = LogUtil.getLogger(getClass) + // This magic number is derived from Akka's configuration, which is the maximum delay + private val INF_TIMEOUT = new Timeout(2147482 seconds) +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala index 0ae7365..24e70dd 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala @@ -20,13 +20,16 @@ package org.apache.gearpump.cluster.master import akka.actor._ import akka.pattern.ask +import com.typesafe.config.ConfigFactory +import org.apache.gearpump._ import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _} import org.apache.gearpump.cluster.AppMasterToWorker._ +import org.apache.gearpump.cluster.{ApplicationStatus, ApplicationTerminalStatus} import org.apache.gearpump.cluster.ClientToMaster._ import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _} import org.apache.gearpump.cluster.MasterToClient._ import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _} -import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState} +import org.apache.gearpump.cluster.appmaster.{ApplicationMetaData, ApplicationRuntimeInfo} import org.apache.gearpump.cluster.master.AppManager._ import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _} import org.apache.gearpump.cluster.master.Master._ @@ -46,7 +49,6 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch private val LOG: Logger = LogUtil.getLogger(getClass) - private val EXECUTOR_ID: Int = APPMASTER_DEFAULT_EXECUTOR_ID private val appMasterMaxRetries: Int = 5 private val appMasterRetryTimeRange: Duration = 20.seconds @@ -56,15 +58,8 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch // Next available appId private var nextAppId: Int = 1 - // From appId to appMaster data - // Applications not in activeAppMasters or deadAppMasters are in pending status - private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)] - - // Active appMaster list where applications are in active status - private var activeAppMasters = Set.empty[Int] - - // Dead appMaster list where applications are in inactive status - private var deadAppMasters = Set.empty[Int] + private var applicationRegistry = Map.empty[Int, ApplicationRuntimeInfo] + private var appResultListeners = Map.empty[Int, List[ActorRef]] private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy] @@ -78,9 +73,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch val masterState = result.asInstanceOf[MasterState] if (masterState != null) { this.nextAppId = masterState.maxId + 1 - this.activeAppMasters = masterState.activeAppMasters - this.deadAppMasters = masterState.deadAppMasters - this.appMasterRegistry = masterState.appMasterRegistry + this.applicationRegistry = masterState.applicationRegistry } context.become(receiveHandler) unstashAll() @@ -107,25 +100,33 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch client ! SubmitApplicationResult(Failure( new Exception(s"Application name ${app.name} already existed"))) } else { - context.actorOf(launcher.props(nextAppId, EXECUTOR_ID, app, jar, username, context.parent, - Some(client)), s"launcher${nextAppId}_${Util.randInt()}") - - val appState = new ApplicationState(nextAppId, app.name, 0, app, jar, username, null) + context.actorOf(launcher.props(nextAppId, APPMASTER_DEFAULT_EXECUTOR_ID, app, jar, username, + context.parent, Some(client)), s"launcher${nextAppId}_${Util.randInt()}") appMasterRestartPolicies += nextAppId -> new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange) - kvService ! PutKV(nextAppId.toString, APP_STATE, appState) + + val appRuntimeInfo = ApplicationRuntimeInfo(nextAppId, app.name, + user = username, + submissionTime = System.currentTimeMillis(), + config = app.clusterConfig, + status = ApplicationStatus.PENDING) + applicationRegistry += nextAppId -> appRuntimeInfo + val appMetaData = ApplicationMetaData(nextAppId, 0, app, jar, username) + kvService ! PutKV(nextAppId.toString, APP_METADATA, appMetaData) + nextAppId += 1 + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry)) } case RestartApplication(appId) => val client = sender() - (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map { + (kvService ? GetKV(appId.toString, APP_METADATA)).asInstanceOf[Future[GetKVResult]].map { case GetKVSuccess(_, result) => - val appState = result.asInstanceOf[ApplicationState] - if (appState != null) { + val metaData = result.asInstanceOf[ApplicationMetaData] + if (metaData != null) { LOG.info(s"Shutting down the application (restart), $appId") self ! ShutdownApplication(appId) - self.tell(SubmitApplication(appState.app, appState.jar, appState.username), client) + self.tell(SubmitApplication(metaData.appDesc, metaData.jar, metaData.username), client) } else { client ! SubmitApplicationResult(Failure( new Exception(s"Failed to restart, because the application $appId does not exist.") @@ -140,19 +141,16 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch case ShutdownApplication(appId) => LOG.info(s"App Manager Shutting down application $appId") - val (_, appInfo) = appMasterRegistry.get(appId) - .filter { case (_, info) => !deadAppMasters.contains(info.appId)} - .getOrElse((null, null)) - Option(appInfo) match { + val appInfo = applicationRegistry.get(appId). + filter(!_.status.isInstanceOf[ApplicationTerminalStatus]) + appInfo match { case Some(info) => - val worker = info.worker - val workerPath = Option(worker).map(_.path).orNull - LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, executorId: $EXECUTOR_ID") - cleanApplicationData(appId) - val shutdown = ShutdownExecutor(appId, EXECUTOR_ID, - s"AppMaster $appId shutdown requested by master...") - sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut()) + killAppMaster(appId, info.worker) sender ! ShutdownApplicationResult(Success(appId)) + // Here we use the function to make sure the status is consistent because + // sending another message to self will involve timing problem + this.onApplicationStatusChanged(appId, ApplicationStatus.TERMINATED, + System.currentTimeMillis(), null) case None => val errorMsg = s"Failed to find registration information for appId: $appId" LOG.error(errorMsg) @@ -160,54 +158,49 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch } case ResolveAppId(appId) => - val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null)) - if (null != appMaster) { - sender ! ResolveAppIdResult(Success(appMaster)) - } else { - sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId"))) + val appMaster = applicationRegistry.get(appId).map(_.appMaster) + appMaster match { + case Some(appMasterActor) => + sender ! ResolveAppIdResult(Success(appMasterActor)) + case None => + sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId"))) } case AppMastersDataRequest => var appMastersData = collection.mutable.ListBuffer[AppMasterData]() - appMasterRegistry.foreach(pair => { - val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair - val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) + applicationRegistry.foreach(pair => { + val (id, info: ApplicationRuntimeInfo) = pair + val appMasterPath = ActorUtil.getFullPath(context.system, info.appMaster) val workerPath = Option(info.worker).map(worker => - ActorUtil.getFullPath(context.system, worker.path)) - val status = getAppMasterStatus(id) + ActorUtil.getFullPath(context.system, worker)) appMastersData += AppMasterData( - status, id, info.appName, appMasterPath, workerPath.orNull, + info.status, id, info.appName, appMasterPath, workerPath.orNull, info.submissionTime, info.startTime, info.finishTime, info.user) }) - sender ! AppMastersData(appMastersData.toList) case QueryAppMasterConfig(appId) => - val config = - if (appMasterRegistry.contains(appId)) { - val (_, info) = appMasterRegistry(appId) - info.config - } else { - null - } + val config = applicationRegistry.get(appId).map(_.config).getOrElse(ConfigFactory.empty()) sender ! AppMasterConfig(config) case appMasterDataRequest: AppMasterDataRequest => val appId = appMasterDataRequest.appId - val appStatus = getAppMasterStatus(appId) - - appStatus match { - case AppMasterNonExist => - sender ! AppMasterData(AppMasterNonExist) - case _ => - val (appMaster, info) = appMasterRegistry(appId) - val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) + val appRuntimeInfo = applicationRegistry.get(appId) + appRuntimeInfo match { + case Some(info) => + val appMasterPath = ActorUtil.getFullPath(context.system, info.appMaster.path) val workerPath = Option(info.worker).map( worker => ActorUtil.getFullPath(context.system, worker.path)).orNull sender ! AppMasterData( - appStatus, appId, info.appName, appMasterPath, workerPath, + info.status, appId, info.appName, appMasterPath, workerPath, info.submissionTime, info.startTime, info.finishTime, info.user) + case None => + sender ! AppMasterData(ApplicationStatus.NONEXIST) } + + case RegisterAppResultListener(appId) => + val listenerList = appResultListeners.getOrElse(appId, List.empty[ActorRef]) + appResultListeners += appId -> (listenerList :+ sender()) } def workerMessage: Receive = { @@ -217,40 +210,62 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch LOG.error(failed.reason) } - private def getAppMasterStatus(appId: Int): AppMasterStatus = { - if (activeAppMasters.contains(appId)) { - AppMasterActive - } else if (deadAppMasters.contains(appId)) { - AppMasterInActive - } else if (appMasterRegistry.contains(appId)) { - AppMasterPending - } else { - AppMasterNonExist - } - } + def appMasterMessage: Receive = { + case RegisterAppMaster(appId, appMaster, workerInfo) => + val appInfo = applicationRegistry.get(appId) + appInfo match { + case Some(info) => + LOG.info(s"Register AppMaster for app: $appId") + val updatedInfo = info.onAppMasterRegistered(appMaster, workerInfo.ref) + context.watch(appMaster) + applicationRegistry += appId -> updatedInfo + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry)) + sender ! AppMasterRegistered(appId) + case None => + LOG.error(s"Can not find submitted application $appId") + } - private def shutDownExecutorTimeOut(): Unit = { - LOG.error(s"Shut down executor time out") + case ApplicationStatusChanged(appId, newStatus, timeStamp, error) => + onApplicationStatusChanged(appId, newStatus, timeStamp, error) } - def appMasterMessage: Receive = { - case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) => - val startTime = System.currentTimeMillis() - val register = registerBack.copy(startTime = startTime) - - LOG.info(s"Register AppMaster for app: ${register.appId}, $register") - context.watch(appMaster) - appMasterRegistry += register.appId -> (appMaster, register) - kvService ! PutKV(MASTER_GROUP, MASTER_STATE, - MasterState(nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) - sender ! AppMasterRegistered(register.appId) - - case ActivateAppMaster(appId) => - LOG.info(s"Activate AppMaster for app $appId") - activeAppMasters += appId - kvService ! PutKV(MASTER_GROUP, MASTER_STATE, - MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) - sender ! AppMasterActivated(appId) + private def onApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus, + timeStamp: TimeStamp, error: Throwable): Unit = { + applicationRegistry.get(appId) match { + case Some(appRuntimeInfo) => + var updatedStatus: ApplicationRuntimeInfo = null + LOG.info(s"Application $appId change to ${newStatus.toString} at $timeStamp") + newStatus match { + case ApplicationStatus.ACTIVE => + updatedStatus = appRuntimeInfo.onAppMasterActivated(timeStamp) + sender ! AppMasterActivated(appId) + case [email protected] => + killAppMaster(appId, appRuntimeInfo.worker) + updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded) + appResultListeners.getOrElse(appId, List.empty).foreach{ client => + client ! ApplicationSucceeded(appId) + } + case [email protected] => + killAppMaster(appId, appRuntimeInfo.worker) + updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed) + appResultListeners.getOrElse(appId, List.empty).foreach{ client => + client ! ApplicationFailed(appId, error) + } + case [email protected] => + updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated) + case status => + LOG.error(s"App $appId should not change it's status to $status") + } + + if (newStatus.isInstanceOf[ApplicationTerminalStatus]) { + kvService ! DeleteKVGroup(appId.toString) + } + applicationRegistry += appId -> updatedStatus + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry)) + case None => + LOG.error(s"Can not find application runtime info for appId $appId when it's " + + s"status changed to ${newStatus.toString}") + } } def appDataStoreService: Receive = { @@ -265,7 +280,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch case GetAppData(appId, key) => val client = sender() (kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map { - case GetKVSuccess(privateKey, value) => + case GetKVSuccess(_, value) => client ! GetAppDataResult(key, value) case GetKVFailed(ex) => client ! GetAppDataResult(key, null) @@ -279,20 +294,23 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch // Now we assume that the only normal way to stop the application is submitting a // ShutdownApplication request - val application = appMasterRegistry.find { appInfo => - val (_, (actorRef, _)) = appInfo - actorRef.compareTo(terminate.actor) == 0 + val application = applicationRegistry.find { appInfo => + val (_, runtimeInfo) = appInfo + terminate.actor.equals(runtimeInfo.appMaster) && + !runtimeInfo.status.isInstanceOf[ApplicationTerminalStatus] } if (application.nonEmpty) { val appId = application.get._1 - (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map { + (kvService ? GetKV(appId.toString, APP_METADATA)).asInstanceOf[Future[GetKVResult]].map { case GetKVSuccess(_, result) => - val appState = result.asInstanceOf[ApplicationState] - if (appState != null) { + val appMetadata = result.asInstanceOf[ApplicationMetaData] + if (appMetadata != null) { LOG.info(s"Recovering application, $appId") - self ! RecoverApplication(appState) + val updatedInfo = application.get._2.copy(status = ApplicationStatus.PENDING) + applicationRegistry += appId -> updatedInfo + self ! RecoverApplication(appMetadata) } else { - LOG.error(s"Cannot find application state for $appId") + LOG.error(s"Cannot find application meta data for $appId") } case GetKVFailed(ex) => LOG.error(s"Cannot find master state to recover") @@ -305,50 +323,41 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch val appId = state.appId if (appMasterRestartPolicies.get(appId).get.allowRestart) { LOG.info(s"AppManager Recovering Application $appId...") - activeAppMasters -= appId kvService ! PutKV(MASTER_GROUP, MASTER_STATE, - MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) - context.actorOf(launcher.props(appId, EXECUTOR_ID, state.app, state.jar, state.username, - context.parent, None), s"launcher${appId}_${Util.randInt()}") + MasterState(this.nextAppId, applicationRegistry)) + context.actorOf(launcher.props(appId, APPMASTER_DEFAULT_EXECUTOR_ID, state.appDesc, + state.jar, state.username, context.parent, None), s"launcher${appId}_${Util.randInt()}") } else { LOG.error(s"Application $appId failed too many times") } } - case class RecoverApplication(applicationStatus: ApplicationState) - - private def cleanApplicationData(appId: Int): Unit = { - if (appMasterRegistry.contains(appId)) { - // Add the dead app to dead appMasters - deadAppMasters += appId - // Remove the dead app from active appMasters - activeAppMasters -= appId - - appMasterRegistry += appId -> { - val (ref, info) = appMasterRegistry(appId) - (ref, info.copy(finishTime = System.currentTimeMillis())) - } - kvService ! PutKV(MASTER_GROUP, MASTER_STATE, - MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) - kvService ! DeleteKVGroup(appId.toString) - } + private def killAppMaster(appId: Int, worker: ActorRef): Unit = { + val workerPath = Option(worker).map(_.path).orNull + LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, executorId:" + + s" $APPMASTER_DEFAULT_EXECUTOR_ID") + val shutdown = ShutdownExecutor(appId, APPMASTER_DEFAULT_EXECUTOR_ID, + s"AppMaster $appId shutdown requested by master...") + sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut()) } private def applicationNameExist(appName: String): Boolean = { - appMasterRegistry.values.exists { case (_, info) => - info.appName == appName && !deadAppMasters.contains(info.appId) + applicationRegistry.values.exists { info => + info.appName == appName && !info.status.isInstanceOf[ApplicationTerminalStatus] } } + + private def shutDownExecutorTimeOut(): Unit = { + LOG.error(s"Shut down executor time out") + } } object AppManager { - final val APP_STATE = "app_state" + final val APP_METADATA = "app_metadata" // The id is used in KVStore final val MASTER_STATE = "master_state" - case class MasterState( - maxId: Int, - appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)], - activeAppMasters: Set[Int], - deadAppMasters: Set[Int]) + case class RecoverApplication(appMetaData: ApplicationMetaData) + + case class MasterState(maxId: Int, applicationRegistry: Map[Int, ApplicationRuntimeInfo]) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala index 9305d5c..2d79558 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala @@ -34,7 +34,7 @@ import org.apache.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownEx import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected -import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo, WorkerInfo} +import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, ApplicationRuntimeInfo, WorkerInfo} import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} import org.apache.gearpump.cluster.{AppDescription, AppJar, _} import org.apache.gearpump.transport.HostPort @@ -43,7 +43,6 @@ import org.apache.gearpump.util.Constants._ import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, LogUtil, Util} /** - * * AppMasterLauncher is a child Actor of AppManager, it is responsible * to launch the AppMaster on the cluster. */ @@ -53,11 +52,11 @@ class AppMasterLauncher( extends Actor { private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) - val scheduler = context.system.scheduler - val systemConfig = context.system.settings.config - val TIMEOUT = Duration(15, TimeUnit.SECONDS) + private val scheduler = context.system.scheduler + private val systemConfig = context.system.settings.config + private val TIMEOUT = Duration(15, TimeUnit.SECONDS) - val appMasterAkkaConfig: Config = app.clusterConfig + private val appMasterAkkaConfig: Config = app.clusterConfig LOG.info(s"Ask Master resource to start AppMaster $appId...") master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)) @@ -66,18 +65,12 @@ class AppMasterLauncher( def waitForResourceAllocation: Receive = { case ResourceAllocated(allocations) => - val ResourceAllocation(resource, worker, workerId) = allocations(0) - LOG.info(s"Resource allocated for appMaster $appId on worker ${workerId}(${worker.path})") - - val submissionTime = System.currentTimeMillis() + LOG.info(s"Resource allocated for appMaster $appId on worker $workerId(${worker.path})") - val appMasterInfo = AppMasterRuntimeInfo(appId, app.name, worker, username, - submissionTime, config = appMasterAkkaConfig) val workerInfo = WorkerInfo(workerId, worker) - val appMasterContext = - AppMasterContext(appId, username, resource, workerInfo, jar, null, appMasterInfo) - LOG.info(s"Try to launch a executor for AppMaster on worker ${workerId} for app $appId") + val appMasterContext = AppMasterContext(appId, username, resource, workerInfo, jar, null) + LOG.info(s"Try to launch a executor for AppMaster on worker $workerId for app $appId") val name = ActorUtil.actorNameForExecutor(appId, executorId) val selfPath = ActorUtil.getFullPath(context.system, self.path) @@ -88,12 +81,11 @@ class AppMasterLauncher( username, appMasterAkkaConfig) worker ! LaunchExecutor(appId, executorId, resource, executorJVM) - context.become(waitForActorSystemToStart(worker, appMasterContext, app.userConfig, resource)) + context.become(waitForActorSystemToStart(worker, appMasterContext, resource)) } - def waitForActorSystemToStart( - worker: ActorRef, appContext: AppMasterContext, user: UserConfig, resource: Resource) - : Receive = { + def waitForActorSystemToStart(worker: ActorRef, appContext: AppMasterContext, + resource: Resource): Receive = { case ExecutorLaunchRejected(reason, ex) => LOG.error(s"Executor Launch failed reason: $reason", ex) LOG.info(s"reallocate resource $resource to start appmaster") @@ -105,8 +97,8 @@ class AppMasterLauncher( val masterAddress = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS) .asScala.map(HostPort(_)).map(ActorUtil.getMasterActorPath) - sender ! CreateActor( - AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), s"appdaemon$appId") + sender ! CreateActor(AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), + s"appdaemon$appId") import context.dispatcher val appMasterTimeout = scheduler.scheduleOnce(TIMEOUT, self, @@ -121,7 +113,7 @@ class AppMasterLauncher( LOG.info(s"AppMaster is created, mission complete...") replyToClient(SubmitApplicationResult(Success(appId))) context.stop(self) - case CreateActorFailed(name, reason) => + case CreateActorFailed(_, reason) => cancel.cancel() worker ! ShutdownExecutor(appId, executorId, reason.getMessage) replyToClient(SubmitApplicationResult(Failure(reason))) @@ -129,9 +121,7 @@ class AppMasterLauncher( } def replyToClient(result: SubmitApplicationResult): Unit = { - if (client.isDefined) { - client.get.tell(result, master) - } + client.foreach(_.tell(result, master)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala index 6b4df07..8da417e 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala @@ -184,8 +184,6 @@ private[cluster] class Master extends Actor with Stash { scheduler forward request case registerAppMaster: RegisterAppMaster => appManager forward registerAppMaster - case activateAppMaster: ActivateAppMaster => - appManager forward activateAppMaster case save: SaveAppData => appManager forward save case get: GetAppData => @@ -215,6 +213,8 @@ private[cluster] class Master extends Actor with Stash { case invalidAppMaster: InvalidAppMaster => appManager forward invalidAppMaster + case statusChanged: ApplicationStatusChanged => + appManager forward statusChanged } import scala.util.{Failure, Success} @@ -257,6 +257,8 @@ private[cluster] class Master extends Actor with Stash { appManager forward query case QueryMasterConfig => sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) + case register: RegisterAppResultListener => + appManager forward register } def disassociated: Receive = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala index 82c7fe2..0f49a59 100644 --- a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala +++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.util -import org.apache.gearpump.cluster.AppMasterContext +import org.apache.gearpump.cluster.{ApplicationStatus, AppMasterContext} import org.apache.gearpump.cluster.worker.WorkerId import scala.concurrent.{Await, ExecutionContext, Future} @@ -27,7 +27,7 @@ import akka.actor._ import akka.pattern.ask import org.slf4j.Logger import akka.util.Timeout -import org.apache.gearpump.cluster.AppMasterToMaster.{ActivateAppMaster, GetAllWorkers} +import org.apache.gearpump.cluster.AppMasterToMaster.{ApplicationStatusChanged, GetAllWorkers} import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId} import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ResolveWorkerIdResult} @@ -44,6 +44,14 @@ object ActorUtil { system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress } + def getFullPath(system: ActorSystem, actorRef: ActorRef): String = { + if (actorRef != ActorRef.noSender) { + getFullPath(system, actorRef.path) + } else { + "" + } + } + def getFullPath(system: ActorSystem, path: ActorPath): String = { path.toStringWithAddress(getSystemAddress(system)) } @@ -102,7 +110,8 @@ object ActorUtil { def tellMasterIfApplicationReady(workerNum: Option[Int], executorSystemNum: Int, appContext: AppMasterContext): Unit = { if (workerNum.contains(executorSystemNum)) { - appContext.masterProxy ! ActivateAppMaster(appContext.appId) + appContext.masterProxy ! ApplicationStatusChanged(appContext.appId, ApplicationStatus.ACTIVE, + System.currentTimeMillis(), null) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala index f9b0762..ab0275a 100644 --- a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala @@ -21,13 +21,14 @@ package org.apache.gearpump.cluster.appmaster import akka.actor.{Actor, ActorRef, Props} import akka.testkit.TestProbe import com.typesafe.config.Config -import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _} +import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, RegisterAppMaster, _} import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication} import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _} import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} -import org.apache.gearpump.cluster.master.{AppMasterLauncherFactory, AppManager} +import org.apache.gearpump.cluster.master.{AppManager, AppMasterLauncherFactory} import org.apache.gearpump.cluster.master.AppManager._ import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess} +import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.cluster.{TestUtil, _} import org.apache.gearpump.util.LogUtil import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} @@ -51,7 +52,7 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref, new DummyAppMasterLauncherFactory(appLauncher)))) kvService.expectMsgType[GetKV] - kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty))) + kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty))) } override def afterEach(): Unit = { @@ -59,14 +60,24 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with } "AppManager" should "handle AppMaster message correctly" in { + val app = TestUtil.dummyApp + val submit = SubmitApplication(app, None, "username") + val client = TestProbe()(getActorSystem) + + client.send(appManager, submit) + val appMaster = TestProbe()(getActorSystem) val appId = 1 - val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName")) + kvService.expectMsgType[PutKV] + kvService.expectMsgType[PutKV] + appLauncher.expectMsg(LauncherStarted(appId)) + val register = RegisterAppMaster(appId, appMaster.ref, WorkerInfo(WorkerId(1, 0), null)) appMaster.send(appManager, register) appMaster.expectMsgType[AppMasterRegistered] - appMaster.send(appManager, ActivateAppMaster(appId)) + val active = ApplicationStatusChanged(appId, ApplicationStatus.ACTIVE, 0, null) + appMaster.send(appManager, active) appMaster.expectMsgType[AppMasterActivated] } @@ -101,7 +112,7 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure) mockClient.send(appManager, AppMasterDataRequest(1)) - mockClient.expectMsg(AppMasterData(AppMasterNonExist)) + mockClient.expectMsg(AppMasterData(ApplicationStatus.NONEXIST)) } "AppManager" should "reject the application submission if the app name already existed" in { @@ -115,9 +126,10 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with client.send(appManager, submit) kvService.expectMsgType[PutKV] + kvService.expectMsgType[PutKV] appLauncher.expectMsg(LauncherStarted(appId)) - appMaster.send(appManager, RegisterAppMaster(appMaster.ref, - AppMasterRuntimeInfo(appId, app.name))) + val register = RegisterAppMaster(appId, appMaster.ref, WorkerInfo(WorkerId(1, 0), worker.ref)) + appMaster.send(appManager, register) appMaster.expectMsgType[AppMasterRegistered] client.send(appManager, submit) @@ -135,9 +147,10 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with client.send(appManager, submit) kvService.expectMsgType[PutKV] + kvService.expectMsgType[PutKV] appLauncher.expectMsg(LauncherStarted(appId)) - appMaster.send(appManager, RegisterAppMaster(appMaster.ref, - AppMasterRuntimeInfo(appId, app.name))) + val register = RegisterAppMaster(appId, appMaster.ref, WorkerInfo(WorkerId(1, 0), worker.ref)) + appMaster.send(appManager, register) kvService.expectMsgType[PutKV] appMaster.expectMsgType[AppMasterRegistered] @@ -157,8 +170,8 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with // Do recovery getActorSystem.stop(appMaster.ref) kvService.expectMsgType[GetKV] - val appState = ApplicationState(appId, "application1", 1, app, None, "username", null) - kvService.reply(GetKVSuccess(APP_STATE, appState)) + val appState = ApplicationMetaData(appId, 1, app, None, "username") + kvService.reply(GetKVSuccess(APP_METADATA, appState)) appLauncher.expectMsg(LauncherStarted(appId)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala index a41856d..3a698ae 100644 --- a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala @@ -112,7 +112,7 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before } private def setupAppMasterRuntimeEnv(): TestAppMasterEnv = { - val appContext = AppMasterContext(0, null, null, null, null, null, null) + val appContext = AppMasterContext(0, null, null, null, null, null) val app = AppDescription("app", "AppMasterClass", null, null) val master = TestProbe() val masterFactory = (_: AppId, _: MasterActorRef) => toProps(master) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala index 163da0a..3864cc3 100644 --- a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala @@ -35,8 +35,8 @@ import org.apache.gearpump.cluster.master.MasterProxy.WatchMaster class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndAfterAll { implicit var system: ActorSystem = null - val register = RegisterAppMaster(null, null) val appId = 0 + val register = RegisterAppMaster(appId, null, null) override def beforeAll(): Unit = { system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala index 2166976..0ad6883 100644 --- a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala @@ -28,7 +28,7 @@ import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, Reso import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker import org.apache.gearpump.cluster.master.MasterProxy -import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.apache.gearpump.cluster.{ApplicationStatus, MasterHarness, TestUtil} import org.apache.gearpump.transport.HostPort import org.apache.gearpump.util.Constants._ import org.apache.gearpump.util.{Constants, LogUtil, Util} @@ -107,7 +107,8 @@ class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with Maste } masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest) - masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName")))) + masterReceiver.reply(AppMastersData(List(AppMasterData( + ApplicationStatus.ACTIVE, 0, "appName")))) } "Kill" should "be started without exception" in { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala index a8adaf0..6593836 100644 --- a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala @@ -18,16 +18,17 @@ package org.apache.gearpump.cluster.master +import org.apache.gearpump.cluster.AppDescription import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import org.apache.gearpump.cluster.appmaster.ApplicationState +import org.apache.gearpump.cluster.appmaster.ApplicationMetaData class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach { "ApplicationState" should "check equal with respect to only appId and attemptId" in { - val stateA = ApplicationState(0, "application0", 0, null, null, null, "A") - val stateB = ApplicationState(0, "application0", 0, null, null, null, "B") - val stateC = ApplicationState(0, "application1", 1, null, null, null, "A") + val appDescription = AppDescription("app", "AppMaster", null) + val stateA = ApplicationMetaData(0, 0, appDescription, null, null) + val stateB = ApplicationMetaData(0, 0, appDescription, null, null) + val stateC = ApplicationMetaData(0, 1, appDescription, null, null) assert(stateA == stateB) assert(stateA.hashCode == stateB.hashCode) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala index e22abaf..f86a78a 100644 --- a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala +++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala @@ -28,7 +28,7 @@ import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterApp import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList} import org.apache.gearpump.cluster._ -import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo} +import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, ApplicationRuntimeInfo} import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem @@ -49,11 +49,11 @@ class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter "DistributedShell AppMaster" should { "launch one ShellTask on each worker" in { - val appMasterInfo = AppMasterRuntimeInfo(appId, appName = appId.toString) - val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, - masterProxy, appMasterInfo) + val appMasterInfo = ApplicationRuntimeInfo(appId, appName = appId.toString) + val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy) TestActorRef[DistShellAppMaster]( - AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext)) + AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, + appMasterContext)) mockMaster.expectMsgType[RegisterAppMaster] mockMaster.reply(AppMasterRegistered(appId)) // The DistributedShell AppMaster asks for worker list from Master. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala index 7516138..b78bfc2 100644 --- a/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala +++ b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala @@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, Matchers, WordSpec} import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource} import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList} -import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo} +import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, ApplicationRuntimeInfo} import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, TestUtil, UserConfig} @@ -52,11 +52,10 @@ class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfte "DistService AppMaster" should { "responsable for service distributing" in { - val appMasterInfo = AppMasterRuntimeInfo(appId, "appName", mockWorker1.ref) - val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy, - appMasterInfo) + val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy) TestActorRef[DistServiceAppMaster]( - AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext)) + AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, + appMasterContext)) val registerAppMaster = mockMaster.receiveOne(15.seconds) assert(registerAppMaster.isInstanceOf[RegisterAppMaster]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala index df1de06..e2d421c 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -21,11 +21,11 @@ package org.apache.gearpump.experiments.storm.main import java.io.{File, FileOutputStream, FileWriter} import java.nio.ByteBuffer import java.nio.channels.{Channels, WritableByteChannel} -import java.util.{HashMap => JHashMap, Map => JMap, UUID} +import java.util.{UUID, HashMap => JHashMap, Map => JMap} + import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} - import akka.actor.ActorSystem import com.typesafe.config.ConfigValueFactory import backtype.storm.Config @@ -35,10 +35,9 @@ import backtype.storm.utils.Utils import org.apache.storm.shade.org.json.simple.JSONValue import org.apache.storm.shade.org.yaml.snakeyaml.Yaml import org.slf4j.Logger - import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import org.apache.gearpump.cluster.{MasterToAppMaster, UserConfig} +import org.apache.gearpump.cluster.{ApplicationStatus, MasterToAppMaster, UserConfig} import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology import org.apache.gearpump.experiments.storm.util.TimeCacheMapWrapper.Callback import org.apache.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, StormUtil, TimeCacheMapWrapper} @@ -271,7 +270,7 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe clientContext.listApps.appMasters.foreach { app => val name = app.appName if (applications.contains(name)) { - if (app.status != MasterToAppMaster.AppMasterActive) { + if (app.status != ApplicationStatus.ACTIVE) { removeTopology(name) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala index a00495a..f863a97 100644 --- a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala @@ -18,8 +18,7 @@ package org.apache.gearpump.integrationtest import org.scalatest._ - -import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.cluster.{ApplicationStatus, MasterToAppMaster} import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData import org.apache.gearpump.util.LogUtil @@ -87,7 +86,7 @@ trait TestSpecBase def expectAppIsRunning(appId: Int, expectedAppName: String): Unit = { Util.retryUntil(() => { val app = restClient.queryApp(appId) - app.status == MasterToAppMaster.AppMasterActive && app.appName == expectedAppName + app.status == ApplicationStatus.ACTIVE && app.appName == expectedAppName }, s"$expectedAppName is running") } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala index 3fa6f6a..a147377 100644 --- a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala @@ -17,8 +17,8 @@ */ package org.apache.gearpump.integrationtest.checklist -import org.apache.gearpump.cluster.MasterToAppMaster -import org.apache.gearpump.integrationtest.{Util, TestSpecBase} +import org.apache.gearpump.cluster.{ApplicationStatus, MasterToAppMaster} +import org.apache.gearpump.integrationtest.{TestSpecBase, Util} /** * The test spec checks the command-line usage @@ -129,7 +129,7 @@ class CommandLineSpec extends TestSpecBase { val actual = commandLineClient.queryApp(appId) actual.contains(s"application: $appId, ") && actual.contains(s"name: $expectedName, ") && - actual.contains(s"status: ${MasterToAppMaster.AppMasterActive}") + actual.contains(s"status: ${ApplicationStatus.ACTIVE.status}") }, "application is running") } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala index 8b5b82a..c1c7b84 100644 --- a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala @@ -18,8 +18,7 @@ package org.apache.gearpump.integrationtest.checklist import scala.concurrent.duration._ - -import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.cluster.{ApplicationStatus, MasterToAppMaster} import org.apache.gearpump.cluster.master.MasterStatus import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} import org.apache.gearpump.integrationtest.{TestSpecBase, Util} @@ -205,7 +204,7 @@ class RestServiceSpec extends TestSpecBase { runningWorkers.length == expectedWorkersCount }, "all workers running") runningWorkers.foreach { worker => - worker.state shouldEqual MasterToAppMaster.AppMasterActive + worker.state shouldEqual "active" } } @@ -341,7 +340,7 @@ class RestServiceSpec extends TestSpecBase { Util.retryUntil(() => restClient.restartApp(originAppId), "app restarted") val killedApp = restClient.queryApp(originAppId) killedApp.appId shouldEqual originAppId - killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive + killedApp.status shouldEqual ApplicationStatus.TERMINATED val newAppId = originAppId + 1 expectAppIsRunning(newAppId, wordCountName) val runningApps = restClient.listRunningApps() @@ -360,7 +359,7 @@ class RestServiceSpec extends TestSpecBase { val actualApp = restClient.queryApp(appId) actualApp.appId shouldEqual appId - actualApp.status shouldEqual MasterToAppMaster.AppMasterInActive + actualApp.status shouldEqual ApplicationStatus.TERMINATED } private def expectMetricsAvailable(condition: => Boolean, conditionDescription: String): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala index 4b15055..4ece4c0 100644 --- a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala @@ -18,8 +18,7 @@ package org.apache.gearpump.integrationtest.checklist import scala.concurrent.duration.Duration - -import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.cluster.{ApplicationStatus, MasterToAppMaster} import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.integrationtest.{TestSpecBase, Util} import org.apache.gearpump.util.{Constants, LogUtil} @@ -48,7 +47,7 @@ class StabilitySpec extends TestSpecBase { // verify val laterAppMaster = restClient.queryStreamingAppDetail(appId) - laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive + laterAppMaster.status shouldEqual ApplicationStatus.ACTIVE laterAppMaster.clock should be > 0L } } @@ -70,7 +69,7 @@ class StabilitySpec extends TestSpecBase { // verify val laterAppMaster = restClient.queryStreamingAppDetail(appId) - laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive + laterAppMaster.status shouldEqual ApplicationStatus.ACTIVE laterAppMaster.clock should be > 0L } } @@ -129,7 +128,7 @@ class StabilitySpec extends TestSpecBase { // verify val laterAppMaster = restClient.queryStreamingAppDetail(appId) - laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive + laterAppMaster.status shouldEqual ApplicationStatus.ACTIVE laterAppMaster.clock should be > 0L } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala index 884a8d1..2247cf3 100644 --- a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala @@ -18,8 +18,7 @@ package org.apache.gearpump.integrationtest.minicluster import org.apache.log4j.Logger - -import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.cluster.ApplicationStatus import org.apache.gearpump.integrationtest.Docker /** @@ -36,14 +35,10 @@ class CommandLineClient(host: String) { } def listRunningApps(): Array[String] = - listApps().filter( - _.contains(s", status: ${MasterToAppMaster.AppMasterActive}") - ) + listApps().filter(_.contains(s", status: ${ApplicationStatus.ACTIVE}")) def queryApp(appId: Int): String = try { - listApps().filter( - _.startsWith(s"application: $appId") - ).head + listApps().filter(_.startsWith(s"application: $appId")).head } catch { case ex: Throwable => LOG.warn(s"swallowed an exception: $ex") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala index 8fa0679..b8241af 100644 --- a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala @@ -24,12 +24,13 @@ import org.apache.log4j.Logger import upickle.Js import upickle.default._ +import org.apache.gearpump.cluster.ApplicationStatus._ import org.apache.gearpump.cluster.AppMasterToMaster.MasterData import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData} import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics import org.apache.gearpump.cluster.master.MasterSummary import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} -import org.apache.gearpump.cluster.{AppJar, MasterToAppMaster} +import org.apache.gearpump.cluster.AppJar import org.apache.gearpump.integrationtest.{Docker, Util} import org.apache.gearpump.services.AppMasterService.Status import org.apache.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners} @@ -78,7 +79,7 @@ class RestClient(host: String, port: Int) { } def listRunningWorkers(): Array[WorkerSummary] = { - listWorkers().filter(_.state == MasterToAppMaster.AppMasterActive) + listWorkers().filter(_.state == ACTIVE.status) } def listApps(): Array[AppMasterData] = { @@ -87,12 +88,12 @@ class RestClient(host: String, port: Int) { } def listPendingOrRunningApps(): Array[AppMasterData] = { - listApps().filter(app => app.status == MasterToAppMaster.AppMasterActive - || app.status == MasterToAppMaster.AppMasterPending) + listApps().filter(app => app.status == ACTIVE + || app.status == PENDING) } def listRunningApps(): Array[AppMasterData] = { - listApps().filter(_.status == MasterToAppMaster.AppMasterActive) + listApps().filter(_.status == ACTIVE) } def getNextAvailableAppId(): Int = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala index caa3a33..b5b0293 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala @@ -18,8 +18,8 @@ package org.apache.gearpump.services.util +import org.apache.gearpump.cluster.ApplicationStatus import upickle.Js - import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.util.Graph @@ -37,6 +37,19 @@ object UpickleUtil { } } + implicit val appStatusReader: upickle.default.Reader[ApplicationStatus] = + upickle.default.Reader[ApplicationStatus] { + case Js.Str(str) => + str match { + case "pending" => ApplicationStatus.PENDING + case "active" => ApplicationStatus.ACTIVE + case "succeeded" => ApplicationStatus.SUCCEEDED + case "failed" => ApplicationStatus.FAILED + case "terminated" => ApplicationStatus.TERMINATED + case _ => ApplicationStatus.NONEXIST + } + } + implicit val workerIdReader: upickle.default.Reader[WorkerId] = upickle.default.Reader[WorkerId] { case Js.Str(str) => WorkerId.parse(str) @@ -46,4 +59,10 @@ object UpickleUtil { case workerId: WorkerId => Js.Str(WorkerId.render(workerId)) } + + implicit val appStatusWriter: upickle.default.Writer[ApplicationStatus] = + upickle.default.Writer[ApplicationStatus] { + case status: ApplicationStatus => + Js.Str(status.toString) + } } \ No newline at end of file
