GEARPUMP-3, Define REST API to add/remove worker instances, which allow us to 
scale out in YARN.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/77a5bf77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/77a5bf77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/77a5bf77

Branch: refs/heads/master
Commit: 77a5bf779361b20b0389000bf0c6edb3788d0b59
Parents: 7d42d4c
Author: Sean Zhong <[email protected]>
Authored: Wed Mar 30 14:47:38 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Tue Apr 26 14:24:11 2016 +0800

----------------------------------------------------------------------
 core/src/main/resources/geardefault.conf        | 15 ++++
 .../gearpump/cluster/master/MasterSummary.scala |  7 +-
 .../gearpump/cluster/worker/WorkerSummary.scala |  4 +-
 .../main/scala/io/gearpump/util/Constants.scala |  9 ++
 .../io/gearpump/cluster/master/Master.scala     |  6 +-
 .../io/gearpump/cluster/worker/Worker.scala     |  1 +
 docs/dev-rest-api.md                            | 87 +++++++++++++++++---
 .../experiments/yarn/appmaster/Command.scala    |  2 +
 services/dashboard/services/models/models.js    |  6 +-
 .../io/gearpump/services/RestServices.scala     | 10 ++-
 .../gearpump/services/SupervisorService.scala   | 86 ++++++++++++-------
 11 files changed, 182 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/core/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/core/src/main/resources/geardefault.conf 
b/core/src/main/resources/geardefault.conf
index 394a928..663c244 100644
--- a/core/src/main/resources/geardefault.conf
+++ b/core/src/main/resources/geardefault.conf
@@ -43,6 +43,21 @@ gearpump {
   ## Number of executors to launch when starting an application
   application.executor-num = 1
 
+
+  ## Unique Id to identity this worker instance in low level resource manager 
like YARN.
+  ##
+  ## This value is typically configured by resource manager integration 
module, like gearpump-yarn in this case.
+  ## In YARN, the value is the container Id. The default value is empty, which 
means we don't have a low level
+  ## resource manager.
+  worker-resource-manager-container-id = ""
+
+  ## Unique Id to identity this Master instance in low level resource manager 
like YARN.
+  ##
+  ## This value is typically configured by resource manager integration 
module, like gearpump-yarn in this case.
+  ## In YARN, the value is the container Id. The default value is empty, which 
means we don't have a low level
+  ## resource manager.
+  master-resource-manager-container-id = ""
+
   ## To enable worker use cgroup to make resource isolation,
   ## set gearpump.worker.executor-process-launcher = 
"io.gearpump.cluster.worker.CGroupProcessLauncher"
   ##

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala 
b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
index e78c6a7..39e4a41 100644
--- a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
+++ b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
@@ -25,12 +25,15 @@ object MasterStatus {
   val UnSynced = "unsynced"
 }
 
+
+case class MasterNode(host: String, port: Int)
+
 /**
  * Master information for REST API call
  */
 case class MasterSummary(
-  leader: (String, Int),
-  cluster: List[(String, Int)],
+  leader: MasterNode,
+  cluster: List[MasterNode],
   aliveFor: Long,
   logFile: String,
   jarStore: String,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala 
b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
index 04e1a8c..b1f9e6f 100644
--- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
+++ b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
@@ -34,10 +34,12 @@ case class WorkerSummary(
   availableSlots: Int,
   homeDirectory: String,
   jvmName: String,
+  // Id used to uniquely identity this worker process in low level resource 
manager like YARN.
+  resourceManagerContainerId: String,
   historyMetricsConfig: HistoryMetricsConfig = null)
 
 object WorkerSummary{
-  def empty = WorkerSummary(-1, "", "", 0L, "", Array.empty[ExecutorSlots], 0, 
0, "", jvmName = "")
+  def empty = WorkerSummary(-1, "", "", 0L, "", Array.empty[ExecutorSlots], 0, 
0, "", jvmName = "", resourceManagerContainerId = "")
 }
 
 case class ExecutorSlots(appId: Int, executorId: Int, slots: Int)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/core/src/main/scala/io/gearpump/util/Constants.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala 
b/core/src/main/scala/io/gearpump/util/Constants.scala
index ecf09e4..a4fb545 100644
--- a/core/src/main/scala/io/gearpump/util/Constants.scala
+++ b/core/src/main/scala/io/gearpump/util/Constants.scala
@@ -57,6 +57,15 @@ object Constants {
   val GEARPUMP_LOG_APPLICATION_DIR = "gearpump.log.application.dir"
   val HADOOP_CONF = "hadoopConf"
 
+
+  // Id used to identity Master JVM process in low level resource manager like 
YARN.
+  // In YARN, it means the container Id.
+  val GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID = 
"gearpump.master-resource-manager-container-id"
+
+  // Id used to identity Worker JVM process in low level resource manager like 
YARN.
+  // In YARN, it means the container Id.
+  val GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID = 
"gearpump.worker-resource-manager-container-id"
+
   // true or false
   val GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM = "gearpump.remote-debug-executor-jvm"
   val GEARPUMP_REMOTE_DEBUG_PORT = "gearpump.remote-debug-port"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala 
b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
index f22a300..8db6758 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
@@ -185,12 +185,12 @@ private[cluster] class Master extends Actor with Stash {
     case GetMasterData =>
       val aliveFor = System.currentTimeMillis() - birth
       val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
-      val userDir = System.getProperty("user.dir");
+      val userDir = System.getProperty("user.dir")
 
       val masterDescription =
         MasterSummary(
-          hostPort.toTuple,
-          getMasterClusterList.map(_.toTuple),
+          MasterNode(hostPort.host, hostPort.port),
+          getMasterClusterList.map(hostPort => MasterNode(hostPort.host, 
hostPort.port)),
           aliveFor,
           logFileDir,
           jarStoreRootPath,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala 
b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
index 399bd31..8957dff 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
@@ -194,6 +194,7 @@ private[cluster] class Worker(masterProxy : ActorRef) 
extends Actor with TimeOut
         resource.slots,
         userDir,
         jvmName = ManagementFactory.getRuntimeMXBean().getName(),
+        resourceManagerContainerId = 
systemConfig.getString(Constants.GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
         historyMetricsConfig = getHistoryMetricsConfig)
       )
     case ChangeExecutorResource(appId, executorId, usedResource) =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/docs/dev-rest-api.md
----------------------------------------------------------------------
diff --git a/docs/dev-rest-api.md b/docs/dev-rest-api.md
index e4b8793..7fc718a 100644
--- a/docs/dev-rest-api.md
+++ b/docs/dev-rest-api.md
@@ -112,16 +112,8 @@ Sample Response:
 ```
 {
   "masterDescription": {
-    "leader": [
-      "[email protected]",
-      3000
-    ],
-    "cluster": [
-      [
-        "127.0.0.1",
-        3000
-      ]
-    ],
+    "leader":{"host":"[email protected]","port":3000},
+    "cluster":[{"host":"127.0.0.1","port":3000}]
     "aliveFor": "642941",
     "logFile": "/Users/foobar/gearpump/logs",
     "jarStore": "jarstore/",
@@ -568,6 +560,81 @@ Sample Response:
 }
 ```
 
+## Supervisor Service
+
+Supervisor service allows user to add or remove a worker machine.
+
+### POST api/v1.0/supervisor/status
+Query whether the supervisor service is enabled. If Supervisor service is 
disabled, you are not allowed to use API like addworker/removeworker.
+
+Example:
+
+```bash
+curl -X POST [--cookie outputAuthenticationCookie.txt] 
http://127.0.0.1:8090/api/v1.0/supervisor/status
+```
+
+Sample Response:
+
+```
+{"enabled":true}
+```
+
+
+
+### GET api/v1.0/supervisor
+Get the supervisor path
+
+Example:
+
+```bash
+curl [--cookie outputAuthenticationCookie.txt] 
http://127.0.0.1:8090/api/v1.0/supervisor
+```
+
+Sample Response:
+
+```
+{path: "supervisor actor path"}
+```
+
+### POST api/v1.0/supervisor/addworker/&lt;worker-count&gt;
+Add workerCount new workers in the cluster. It will use the low level resource 
scheduler like
+YARN to start new containers and then boot Gearpump worker process.
+
+Example:
+
+```bash
+curl -X POST [--cookie outputAuthenticationCookie.txt] 
http://127.0.0.1:8090/api/v1.0/supervisor/addworker/2
+
+```
+
+Sample Response:
+
+```
+{success: true}
+```
+
+### POST api/v1.0/supervisor/removeworker/&lt;worker-id&gt;
+Remove single worker instance by specifying a worker Id.
+
+
+**NOTE:* Use with caution!
+
+**NOTE:** All executors JVMs under this worker JVM will also be destroyed. It 
will trigger failover for all
+applications that have executor started under this worker.
+
+Example:
+
+```bash
+curl -X POST [--cookie outputAuthenticationCookie.txt] 
http://127.0.0.1:8090/api/v1.0/supervisor/removeworker/3
+
+```
+
+Sample Response:
+
+```
+{success: true}
+```
+
 ## Application service
 
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
 
b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
index 21077bc..6c38bde 100644
--- 
a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
+++ 
b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
@@ -65,6 +65,7 @@ case class MasterCommand(config: Config, version: String, 
masterAddr: HostPort)
     val properties = Array(
       
s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}",
       s"-D${Constants.GEARPUMP_HOSTNAME}=${masterAddr.host}",
+      
s"-D${Constants.GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}",
       
s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
       s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
       s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}")
@@ -79,6 +80,7 @@ case class WorkerCommand(config: Config, version: String, 
masterAddr: HostPort,
     val properties = Array(
       
s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}",
       s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
+      
s"-D${Constants.GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}",
       
s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
       s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}",
       s"-D${Constants.GEARPUMP_HOSTNAME}=$workerHost")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/services/dashboard/services/models/models.js
----------------------------------------------------------------------
diff --git a/services/dashboard/services/models/models.js 
b/services/dashboard/services/models/models.js
index fabde06..f05ff59 100644
--- a/services/dashboard/services/models/models.js
+++ b/services/dashboard/services/models/models.js
@@ -88,11 +88,11 @@ angular.module('io.gearpump.models', [])
           var obj = wrapper.masterDescription;
           angular.merge(obj, {
             // upickle conversion
-            cluster: _.map(obj.cluster, function(tuple) {
-              return tuple.join(':');
+            cluster: _.map(obj.cluster, function(node) {
+              return node.host + ":" + node.port;
             }),
             jvm: decoder._jvm(obj.jvmName),
-            leader: obj.leader.join(':'),
+            leader: obj.leader.host + ":" + obj.leader.port,
             // extra properties/methods
             isHealthy: obj.masterStatus === 'synced',
             configLink: restapi.masterConfigLink()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala 
b/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
index f2761d6..a44e0a9 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
@@ -60,8 +60,12 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, 
system: ActorSystem
   private val static = new StaticService(system, supervisorPath).route
 
   def supervisor: ActorRef = {
-    val actorRef = system.actorSelection(supervisorPath).resolveOne()
-    Await.result(actorRef, new Timeout(Duration.create(5, "seconds")).duration)
+    if (supervisorPath == null || supervisorPath.isEmpty()) {
+      null
+    } else {
+      val actorRef = system.actorSelection(supervisorPath).resolveOne()
+      Await.result(actorRef, new Timeout(Duration.create(5, 
"seconds")).duration)
+    }
   }
 
   override def route: Route = {
@@ -83,7 +87,7 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, 
system: ActorSystem
     val masterService = new MasterService(master, jarStoreService, system)
     val worker = new WorkerService(master, system)
     val app = new AppMasterService(master, jarStoreService, system)
-    val sup = new SupervisorService(supervisor, system)
+    val sup = new SupervisorService(master, supervisor, system)
 
     new RouteService {
       override def route: Route = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/77a5bf77/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala 
b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
index b3b1479..9ecd078 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
@@ -20,63 +20,91 @@ package io.gearpump.services
 
 import akka.actor.{ActorRef, ActorSystem}
 import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
 import akka.stream.Materializer
+import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData, 
GetAllWorkers}
 import io.gearpump.cluster.ClientToMaster._
+import io.gearpump.services.SupervisorService.{Path, Status}
 import io.gearpump.util.ActorUtil._
 
+import scala.concurrent.Future
 import scala.util.{Failure, Success}
+import upickle.default.{read, write}
 
-
-class SupervisorService(val supervisor: ActorRef, override val system: 
ActorSystem)
+class SupervisorService(val master: ActorRef, val supervisor: ActorRef, 
override val system: ActorSystem)
   extends BasicService {
 
   import upickle.default.write
 
+  /**
+   * TODO: Add additional check to ensure the user have enough authorization 
to add/remove a worker machine
+   */
+  private def authorize(internal: Route): Route = {
+    if (supervisor == null) {
+      failWith(new Exception("API not enabled, cannot find a valid supervisor! 
Please make sure Gearpump is " +
+        "running on top of YARN or other resource managers"))
+    } else {
+      internal
+    }
+  }
+
   override def doRoute(implicit mat: Materializer) = pathPrefix("supervisor") {
     pathEnd {
       get {
-        complete(write(supervisor.path.toString))
+        val path = if (supervisor == null) {
+          null
+        } else {
+          supervisor.path.toString
+        }
+        complete(write(Path(path)))
       }
     } ~
-    path("addworker" / IntNumber) { count =>
+    path("status") {
       post {
-        onComplete(askActor[CommandResult](supervisor, AddWorker(count))) {
-          case Success(value) =>
-            complete(write(value))
-          case Failure(ex) =>
-            failWith(ex)
+        if (supervisor == null) {
+          complete(write(Status(enabled = false)))
+        } else {
+          complete(write(Status(enabled = true)))
         }
       }
     } ~
-      path("addmaster") {
-        post {
-          onComplete(askActor[CommandResult](supervisor, AddMaster)) {
+    path("addworker" / IntNumber) { workerCount =>
+      post {
+        authorize {
+          onComplete(askActor[CommandResult](supervisor, 
AddWorker(workerCount))) {
             case Success(value) =>
               complete(write(value))
             case Failure(ex) =>
               failWith(ex)
           }
         }
-      } ~
-        path("removemaster" / Segment) { uriPath =>
-          post {
-            onComplete(askActor[CommandResult](supervisor, 
RemoveMaster(uriPath))) {
-              case Success(value) =>
-                complete(write(value))
-              case Failure(ex) =>
-                failWith(ex)
+      }
+    } ~
+    path("removeworker" / IntNumber) { workerId =>
+      post {
+        authorize {
+
+          def future(): Future[CommandResult] = {
+            askWorker[WorkerData](master, workerId, 
GetWorkerData(workerId)).flatMap{workerData =>
+              val containerId = 
workerData.workerDescription.resourceManagerContainerId
+              askActor[CommandResult](supervisor, RemoveWorker(containerId))
             }
           }
-        } ~
-        path("removeworker" / Segment) { uriPath =>
-          post {
-            onComplete(askActor[CommandResult](supervisor, 
RemoveWorker(uriPath))) {
-              case Success(value) =>
-                complete(write(value))
-              case Failure(ex) =>
-                failWith(ex)
-            }
+
+          onComplete[CommandResult](future()) {
+            case Success(value) =>
+              complete(write(value))
+            case Failure(ex) =>
+              failWith(ex)
           }
         }
+      }
+    }
   }
 }
+
+object SupervisorService{
+  case class Status(enabled: Boolean)
+
+  case class Path(path: String)
+}

Reply via email to