GEARPUMP-3, Define REST API to add/remove worker instances, which allow us to scale out in YARN.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/77a5bf77 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/77a5bf77 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/77a5bf77 Branch: refs/heads/master Commit: 77a5bf779361b20b0389000bf0c6edb3788d0b59 Parents: 7d42d4c Author: Sean Zhong <[email protected]> Authored: Wed Mar 30 14:47:38 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Apr 26 14:24:11 2016 +0800 ---------------------------------------------------------------------- core/src/main/resources/geardefault.conf | 15 ++++ .../gearpump/cluster/master/MasterSummary.scala | 7 +- .../gearpump/cluster/worker/WorkerSummary.scala | 4 +- .../main/scala/io/gearpump/util/Constants.scala | 9 ++ .../io/gearpump/cluster/master/Master.scala | 6 +- .../io/gearpump/cluster/worker/Worker.scala | 1 + docs/dev-rest-api.md | 87 +++++++++++++++++--- .../experiments/yarn/appmaster/Command.scala | 2 + services/dashboard/services/models/models.js | 6 +- .../io/gearpump/services/RestServices.scala | 10 ++- .../gearpump/services/SupervisorService.scala | 86 ++++++++++++------- 11 files changed, 182 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/core/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/core/src/main/resources/geardefault.conf b/core/src/main/resources/geardefault.conf index 394a928..663c244 100644 --- a/core/src/main/resources/geardefault.conf +++ b/core/src/main/resources/geardefault.conf @@ -43,6 +43,21 @@ gearpump { ## Number of executors to launch when starting an application application.executor-num = 1 + + ## Unique Id to identity this worker instance in low level resource manager like YARN. + ## + ## This value is typically configured by resource manager integration module, like gearpump-yarn in this case. + ## In YARN, the value is the container Id. The default value is empty, which means we don't have a low level + ## resource manager. + worker-resource-manager-container-id = "" + + ## Unique Id to identity this Master instance in low level resource manager like YARN. + ## + ## This value is typically configured by resource manager integration module, like gearpump-yarn in this case. + ## In YARN, the value is the container Id. The default value is empty, which means we don't have a low level + ## resource manager. + master-resource-manager-container-id = "" + ## To enable worker use cgroup to make resource isolation, ## set gearpump.worker.executor-process-launcher = "io.gearpump.cluster.worker.CGroupProcessLauncher" ## http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala index e78c6a7..39e4a41 100644 --- a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala +++ b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala @@ -25,12 +25,15 @@ object MasterStatus { val UnSynced = "unsynced" } + +case class MasterNode(host: String, port: Int) + /** * Master information for REST API call */ case class MasterSummary( - leader: (String, Int), - cluster: List[(String, Int)], + leader: MasterNode, + cluster: List[MasterNode], aliveFor: Long, logFile: String, jarStore: String, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala index 04e1a8c..b1f9e6f 100644 --- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala +++ b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala @@ -34,10 +34,12 @@ case class WorkerSummary( availableSlots: Int, homeDirectory: String, jvmName: String, + // Id used to uniquely identity this worker process in low level resource manager like YARN. + resourceManagerContainerId: String, historyMetricsConfig: HistoryMetricsConfig = null) object WorkerSummary{ - def empty = WorkerSummary(-1, "", "", 0L, "", Array.empty[ExecutorSlots], 0, 0, "", jvmName = "") + def empty = WorkerSummary(-1, "", "", 0L, "", Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "") } case class ExecutorSlots(appId: Int, executorId: Int, slots: Int) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/core/src/main/scala/io/gearpump/util/Constants.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala b/core/src/main/scala/io/gearpump/util/Constants.scala index ecf09e4..a4fb545 100644 --- a/core/src/main/scala/io/gearpump/util/Constants.scala +++ b/core/src/main/scala/io/gearpump/util/Constants.scala @@ -57,6 +57,15 @@ object Constants { val GEARPUMP_LOG_APPLICATION_DIR = "gearpump.log.application.dir" val HADOOP_CONF = "hadoopConf" + + // Id used to identity Master JVM process in low level resource manager like YARN. + // In YARN, it means the container Id. + val GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID = "gearpump.master-resource-manager-container-id" + + // Id used to identity Worker JVM process in low level resource manager like YARN. + // In YARN, it means the container Id. + val GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID = "gearpump.worker-resource-manager-container-id" + // true or false val GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM = "gearpump.remote-debug-executor-jvm" val GEARPUMP_REMOTE_DEBUG_PORT = "gearpump.remote-debug-port" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala index f22a300..8db6758 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala @@ -185,12 +185,12 @@ private[cluster] class Master extends Actor with Stash { case GetMasterData => val aliveFor = System.currentTimeMillis() - birth val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath - val userDir = System.getProperty("user.dir"); + val userDir = System.getProperty("user.dir") val masterDescription = MasterSummary( - hostPort.toTuple, - getMasterClusterList.map(_.toTuple), + MasterNode(hostPort.host, hostPort.port), + getMasterClusterList.map(hostPort => MasterNode(hostPort.host, hostPort.port)), aliveFor, logFileDir, jarStoreRootPath, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala index 399bd31..8957dff 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala @@ -194,6 +194,7 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut resource.slots, userDir, jvmName = ManagementFactory.getRuntimeMXBean().getName(), + resourceManagerContainerId = systemConfig.getString(Constants.GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID), historyMetricsConfig = getHistoryMetricsConfig) ) case ChangeExecutorResource(appId, executorId, usedResource) => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/docs/dev-rest-api.md ---------------------------------------------------------------------- diff --git a/docs/dev-rest-api.md b/docs/dev-rest-api.md index e4b8793..7fc718a 100644 --- a/docs/dev-rest-api.md +++ b/docs/dev-rest-api.md @@ -112,16 +112,8 @@ Sample Response: ``` { "masterDescription": { - "leader": [ - "[email protected]", - 3000 - ], - "cluster": [ - [ - "127.0.0.1", - 3000 - ] - ], + "leader":{"host":"[email protected]","port":3000}, + "cluster":[{"host":"127.0.0.1","port":3000}] "aliveFor": "642941", "logFile": "/Users/foobar/gearpump/logs", "jarStore": "jarstore/", @@ -568,6 +560,81 @@ Sample Response: } ``` +## Supervisor Service + +Supervisor service allows user to add or remove a worker machine. + +### POST api/v1.0/supervisor/status +Query whether the supervisor service is enabled. If Supervisor service is disabled, you are not allowed to use API like addworker/removeworker. + +Example: + +```bash +curl -X POST [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/supervisor/status +``` + +Sample Response: + +``` +{"enabled":true} +``` + + + +### GET api/v1.0/supervisor +Get the supervisor path + +Example: + +```bash +curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/supervisor +``` + +Sample Response: + +``` +{path: "supervisor actor path"} +``` + +### POST api/v1.0/supervisor/addworker/<worker-count> +Add workerCount new workers in the cluster. It will use the low level resource scheduler like +YARN to start new containers and then boot Gearpump worker process. + +Example: + +```bash +curl -X POST [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/supervisor/addworker/2 + +``` + +Sample Response: + +``` +{success: true} +``` + +### POST api/v1.0/supervisor/removeworker/<worker-id> +Remove single worker instance by specifying a worker Id. + + +**NOTE:* Use with caution! + +**NOTE:** All executors JVMs under this worker JVM will also be destroyed. It will trigger failover for all +applications that have executor started under this worker. + +Example: + +```bash +curl -X POST [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/supervisor/removeworker/3 + +``` + +Sample Response: + +``` +{success: true} +``` + ## Application service http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala index 21077bc..6c38bde 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala @@ -65,6 +65,7 @@ case class MasterCommand(config: Config, version: String, masterAddr: HostPort) val properties = Array( s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}", s"-D${Constants.GEARPUMP_HOSTNAME}=${masterAddr.host}", + s"-D${Constants.GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}", s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version", s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}", s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}") @@ -79,6 +80,7 @@ case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, val properties = Array( s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}", s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}", + s"-D${Constants.GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}", s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version", s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}", s"-D${Constants.GEARPUMP_HOSTNAME}=$workerHost") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/services/dashboard/services/models/models.js ---------------------------------------------------------------------- diff --git a/services/dashboard/services/models/models.js b/services/dashboard/services/models/models.js index fabde06..f05ff59 100644 --- a/services/dashboard/services/models/models.js +++ b/services/dashboard/services/models/models.js @@ -88,11 +88,11 @@ angular.module('io.gearpump.models', []) var obj = wrapper.masterDescription; angular.merge(obj, { // upickle conversion - cluster: _.map(obj.cluster, function(tuple) { - return tuple.join(':'); + cluster: _.map(obj.cluster, function(node) { + return node.host + ":" + node.port; }), jvm: decoder._jvm(obj.jvmName), - leader: obj.leader.join(':'), + leader: obj.leader.host + ":" + obj.leader.port, // extra properties/methods isHealthy: obj.masterStatus === 'synced', configLink: restapi.masterConfigLink() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala b/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala index f2761d6..a44e0a9 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala @@ -60,8 +60,12 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem private val static = new StaticService(system, supervisorPath).route def supervisor: ActorRef = { - val actorRef = system.actorSelection(supervisorPath).resolveOne() - Await.result(actorRef, new Timeout(Duration.create(5, "seconds")).duration) + if (supervisorPath == null || supervisorPath.isEmpty()) { + null + } else { + val actorRef = system.actorSelection(supervisorPath).resolveOne() + Await.result(actorRef, new Timeout(Duration.create(5, "seconds")).duration) + } } override def route: Route = { @@ -83,7 +87,7 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem val masterService = new MasterService(master, jarStoreService, system) val worker = new WorkerService(master, system) val app = new AppMasterService(master, jarStoreService, system) - val sup = new SupervisorService(supervisor, system) + val sup = new SupervisorService(master, supervisor, system) new RouteService { override def route: Route = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala index b3b1479..9ecd078 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala @@ -20,63 +20,91 @@ package io.gearpump.services import akka.actor.{ActorRef, ActorSystem} import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.Route import akka.stream.Materializer +import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData, GetAllWorkers} import io.gearpump.cluster.ClientToMaster._ +import io.gearpump.services.SupervisorService.{Path, Status} import io.gearpump.util.ActorUtil._ +import scala.concurrent.Future import scala.util.{Failure, Success} +import upickle.default.{read, write} - -class SupervisorService(val supervisor: ActorRef, override val system: ActorSystem) +class SupervisorService(val master: ActorRef, val supervisor: ActorRef, override val system: ActorSystem) extends BasicService { import upickle.default.write + /** + * TODO: Add additional check to ensure the user have enough authorization to add/remove a worker machine + */ + private def authorize(internal: Route): Route = { + if (supervisor == null) { + failWith(new Exception("API not enabled, cannot find a valid supervisor! Please make sure Gearpump is " + + "running on top of YARN or other resource managers")) + } else { + internal + } + } + override def doRoute(implicit mat: Materializer) = pathPrefix("supervisor") { pathEnd { get { - complete(write(supervisor.path.toString)) + val path = if (supervisor == null) { + null + } else { + supervisor.path.toString + } + complete(write(Path(path))) } } ~ - path("addworker" / IntNumber) { count => + path("status") { post { - onComplete(askActor[CommandResult](supervisor, AddWorker(count))) { - case Success(value) => - complete(write(value)) - case Failure(ex) => - failWith(ex) + if (supervisor == null) { + complete(write(Status(enabled = false))) + } else { + complete(write(Status(enabled = true))) } } } ~ - path("addmaster") { - post { - onComplete(askActor[CommandResult](supervisor, AddMaster)) { + path("addworker" / IntNumber) { workerCount => + post { + authorize { + onComplete(askActor[CommandResult](supervisor, AddWorker(workerCount))) { case Success(value) => complete(write(value)) case Failure(ex) => failWith(ex) } } - } ~ - path("removemaster" / Segment) { uriPath => - post { - onComplete(askActor[CommandResult](supervisor, RemoveMaster(uriPath))) { - case Success(value) => - complete(write(value)) - case Failure(ex) => - failWith(ex) + } + } ~ + path("removeworker" / IntNumber) { workerId => + post { + authorize { + + def future(): Future[CommandResult] = { + askWorker[WorkerData](master, workerId, GetWorkerData(workerId)).flatMap{workerData => + val containerId = workerData.workerDescription.resourceManagerContainerId + askActor[CommandResult](supervisor, RemoveWorker(containerId)) } } - } ~ - path("removeworker" / Segment) { uriPath => - post { - onComplete(askActor[CommandResult](supervisor, RemoveWorker(uriPath))) { - case Success(value) => - complete(write(value)) - case Failure(ex) => - failWith(ex) - } + + onComplete[CommandResult](future()) { + case Success(value) => + complete(write(value)) + case Failure(ex) => + failWith(ex) } } + } + } } } + +object SupervisorService{ + case class Status(enabled: Boolean) + + case class Path(path: String) +}
