[GEARPUMP-224] merge gearpump-daemon to gearpump-core

Author: huafengw <fvunic...@gmail.com>

Closes #98 from huafengw/merge.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/c3d5eb63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/c3d5eb63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/c3d5eb63

Branch: refs/heads/master
Commit: c3d5eb63f1d0c6c542268e21fe8356e042dfa232
Parents: a01809b
Author: huafengw <fvunic...@gmail.com>
Authored: Fri Oct 14 19:55:05 2016 +0800
Committer: manuzhang <owenzhang1...@gmail.com>
Committed: Fri Oct 14 19:55:05 2016 +0800

----------------------------------------------------------------------
 .../apache/gearpump/cluster/DaemonMessage.scala |  50 ++
 .../cluster/embedded/EmbeddedCluster.scala      |  95 +++
 .../apache/gearpump/cluster/main/Local.scala    |  89 +++
 .../apache/gearpump/cluster/main/Master.scala   | 236 ++++++++
 .../apache/gearpump/cluster/main/Worker.scala   |  70 +++
 .../gearpump/cluster/master/AppManager.scala    | 354 +++++++++++
 .../cluster/master/InMemoryKVService.scala      | 122 ++++
 .../apache/gearpump/cluster/master/Master.scala | 311 ++++++++++
 .../cluster/scheduler/PriorityScheduler.scala   | 154 +++++
 .../gearpump/cluster/scheduler/Scheduler.scala  |  77 +++
 .../worker/DefaultExecutorProcessLauncher.scala |  40 ++
 .../apache/gearpump/cluster/worker/Worker.scala | 579 ++++++++++++++++++
 .../apache/gearpump/cluster/MiniCluster.scala   |  73 +++
 .../cluster/appmaster/AppManagerSpec.scala      | 182 ++++++
 .../appmaster/InMemoryKVServiceSpec.scala       |  69 +++
 .../apache/gearpump/cluster/main/MainSpec.scala | 188 ++++++
 .../cluster/main/MasterWatcherSpec.scala        |  43 ++
 .../scheduler/PrioritySchedulerSpec.scala       | 230 ++++++++
 .../gearpump/cluster/worker/WorkerSpec.scala    | 128 ++++
 .../apache/gearpump/cluster/DaemonMessage.scala |  51 --
 .../cluster/embedded/EmbeddedCluster.scala      |  95 ---
 .../apache/gearpump/cluster/main/Local.scala    |  90 ---
 .../apache/gearpump/cluster/main/Master.scala   | 236 --------
 .../apache/gearpump/cluster/main/Worker.scala   |  71 ---
 .../gearpump/cluster/master/AppManager.scala    | 355 -----------
 .../cluster/master/InMemoryKVService.scala      | 122 ----
 .../apache/gearpump/cluster/master/Master.scala | 311 ----------
 .../cluster/scheduler/PriorityScheduler.scala   | 156 -----
 .../gearpump/cluster/scheduler/Scheduler.scala  |  79 ---
 .../worker/DefaultExecutorProcessLauncher.scala |  41 --
 .../apache/gearpump/cluster/worker/Worker.scala | 581 -------------------
 .../apache/gearpump/cluster/MiniCluster.scala   |  74 ---
 .../apache/gearpump/cluster/main/MainSpec.scala | 190 ------
 .../cluster/main/MasterWatcherSpec.scala        |  44 --
 .../cluster/master/AppManagerSpec.scala         | 184 ------
 .../cluster/master/InMemoryKVServiceSpec.scala  |  69 ---
 .../scheduler/PrioritySchedulerSpec.scala       | 232 --------
 .../gearpump/cluster/worker/WorkerSpec.scala    | 129 ----
 .../apache/gearpump/redis/RedisMessage.scala    | 148 +++--
 .../org/apache/gearpump/redis/RedisSink.scala   |  27 +-
 project/Build.scala                             |  41 +-
 project/BuildExample.scala                      |   8 +-
 project/Pack.scala                              |  14 +-
 43 files changed, 3227 insertions(+), 3211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
new file mode 100644
index 0000000..1e94132
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import akka.actor.ActorRef
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.WorkerId
+
+/**
+ * Cluster Bootup Flow
+ */
+object WorkerToMaster {
+
+  /** When an worker is started, it sends RegisterNewWorker */
+  case object RegisterNewWorker
+
+  /** When worker lose connection with master, it tries to register itself 
again with old Id. */
+  case class RegisterWorker(workerId: WorkerId)
+
+  /** Worker is responsible to broadcast its current status to master */
+  case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: 
Resource)
+}
+
+object MasterToWorker {
+
+  /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
+  case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
+
+  /** Worker have not received reply from master */
+  case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
+
+  /** Master is synced with worker on resource slots managed by current worker 
*/
+  case object UpdateResourceSucceed
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
new file mode 100644
index 0000000..9bde4d1
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.embedded
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import com.typesafe.config.{Config, ConfigValueFactory}
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.master.{Master => MasterActor}
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import 
org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
 GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
+import org.apache.gearpump.util.{LogUtil, Util}
+
+/**
+ * Create a in-process cluster with single worker
+ */
+class EmbeddedCluster(inputConfig: Config) {
+
+  private val workerCount: Int = 1
+  private var _master: ActorRef = null
+  private var _system: ActorSystem = null
+  private var _config: Config = null
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  def start(): Unit = {
+    val port = Util.findFreePort().get
+    val akkaConf = getConfig(inputConfig, port)
+    _config = akkaConf
+    val system = ActorSystem(MASTER, akkaConf)
+
+    val master = system.actorOf(Props[MasterActor], MASTER)
+
+    0.until(workerCount).foreach { id =>
+      system.actorOf(Props(classOf[WorkerActor], master), 
classOf[WorkerActor].getSimpleName + id)
+    }
+    this._master = master
+    this._system = system
+
+    LOG.info("=================================")
+    LOG.info("Local Cluster is started at: ")
+    LOG.info(s"                 127.0.0.1:$port")
+    LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
+  }
+
+  private def getConfig(inputConfig: Config, port: Int): Config = {
+    val config = inputConfig.
+      withValue("akka.remote.netty.tcp.port", 
ConfigValueFactory.fromAnyRef(port)).
+      withValue(GEARPUMP_CLUSTER_MASTERS,
+        ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
+      withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
+        ConfigValueFactory.fromAnyRef(true)).
+      withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)).
+      withValue("akka.actor.provider",
+        ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
+    config
+  }
+
+  def newClientContext: ClientContext = {
+    ClientContext(_config, _system, _master)
+  }
+
+  def stop(): Unit = {
+    _system.stop(_master)
+    _system.terminate()
+    Await.result(_system.whenTerminated, Duration.Inf)
+  }
+}
+
+object EmbeddedCluster {
+  def apply(): EmbeddedCluster = {
+    new EmbeddedCluster(ClusterConfig.master())
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
new file mode 100644
index 0000000..db2cd8a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import akka.actor.{ActorSystem, Props}
+import com.typesafe.config.ConfigValueFactory
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.{Master => MasterActor}
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util}
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+object Local extends AkkaApp with ArgumentsParser {
+  override def akkaConfig: Config = ClusterConfig.master()
+
+  var LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] =
+    Array("sameprocess" -> CLIOption[Boolean]("", required = false, 
defaultValue = Some(false)),
+      "workernum" -> CLIOption[Int]("<how many workers to start>", required = 
false,
+        defaultValue = Some(2)))
+
+  override val description = "Start a local cluster"
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+
+    this.LOG = {
+      LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL)
+      LogUtil.getLogger(getClass)
+    }
+
+    val config = parse(args)
+    if (null != config) {
+      local(config.getInt("workernum"), config.getBoolean("sameprocess"), 
akkaConf)
+    }
+  }
+
+  def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = {
+    if (sameProcess) {
+      LOG.info("Starting local in same process")
+      System.setProperty("LOCAL", "true")
+    }
+    val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
+      .asScala.flatMap(Util.parseHostList)
+    val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
+
+    if (masters.size != 1 && masters.head.host != local) {
+      LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " +
+        s"with ${Constants.GEARPUMP_HOSTNAME}")
+    } else {
+
+      val hostPort = masters.head
+      implicit val system = ActorSystem(MASTER, akkaConf.
+        withValue("akka.remote.netty.tcp.port", 
ConfigValueFactory.fromAnyRef(hostPort.port))
+      )
+
+      val master = system.actorOf(Props[MasterActor], MASTER)
+      val masterPath = ActorUtil.getSystemAddress(system).toString + 
s"/user/$MASTER"
+
+      0.until(workerCount).foreach { id =>
+        system.actorOf(Props(classOf[WorkerActor], master), 
classOf[WorkerActor].getSimpleName + id)
+      }
+
+      Await.result(system.whenTerminated, Duration.Inf)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
new file mode 100644
index 0000000..f758720
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import akka.cluster.ClusterEvent._
+import akka.cluster.{MemberStatus, Member, Cluster}
+import akka.cluster.ddata.DistributedData
+import akka.cluster.singleton.{ClusterSingletonProxySettings, 
ClusterSingletonProxy, ClusterSingletonManagerSettings, ClusterSingletonManager}
+import com.typesafe.config.ConfigValueFactory
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.Master.MasterListUpdated
+import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode}
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil}
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+object Master extends AkkaApp with ArgumentsParser {
+
+  private var LOG: Logger = LogUtil.getLogger(getClass)
+
+  override def akkaConfig: Config = ClusterConfig.master()
+
+  override val options: Array[(String, CLIOption[Any])] =
+    Array("ip" -> CLIOption[String]("<master ip address>", required = true),
+      "port" -> CLIOption("<master port>", required = true))
+
+  override val description = "Start Master daemon"
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+
+    this.LOG = {
+      LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER)
+      LogUtil.getLogger(getClass)
+    }
+
+    val config = parse(args)
+    master(config.getString("ip"), config.getInt("port"), akkaConf)
+  }
+
+  private def verifyMaster(master: String, port: Int, masters: 
Iterable[String]) = {
+    masters.exists { hostPort =>
+      hostPort == s"$master:$port"
+    }
+  }
+
+  private def master(ip: String, port: Int, akkaConf: Config): Unit = {
+    val masters = 
akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+
+    if (!verifyMaster(ip, port, masters)) {
+      LOG.error(s"The provided ip $ip and port $port doesn't conform with 
config at " +
+        s"gearpump.cluster.masters: ${masters.mkString(", ")}")
+      System.exit(-1)
+    }
+
+    val masterList = masters.map(master => 
s"akka.tcp://${MASTER}@$master").toList.asJava
+    val quorum = masterList.size() / 2 + 1
+    val masterConfig = akkaConf.
+      withValue("akka.remote.netty.tcp.port", 
ConfigValueFactory.fromAnyRef(port)).
+      withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)).
+      withValue("akka.cluster.seed-nodes", 
ConfigValueFactory.fromAnyRef(masterList)).
+      withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members",
+        ConfigValueFactory.fromAnyRef(quorum))
+
+    LOG.info(s"Starting Master Actor system $ip:$port, master list: 
${masters.mkString(";")}")
+    val system = ActorSystem(MASTER, masterConfig)
+
+    val replicator = DistributedData(system).replicator
+    LOG.info(s"Replicator path: ${replicator.path}")
+
+    // Starts singleton manager
+    val singletonManager = system.actorOf(ClusterSingletonManager.props(
+      singletonProps = Props(classOf[MasterWatcher], MASTER),
+      terminationMessage = PoisonPill,
+      settings = 
ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER)
+        .withRole(MASTER)),
+      name = SINGLETON_MANAGER)
+
+    // Start master proxy
+    val masterProxy = system.actorOf(ClusterSingletonProxy.props(
+      singletonManagerPath = s"/user/${SINGLETON_MANAGER}",
+      // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of 
s"${MASTER_WATCHER}".
+      // Master is created when there is a majority of machines started.
+      settings = ClusterSingletonProxySettings(system)
+        .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
+      name = MASTER
+    )
+
+    LOG.info(s"master proxy is started at ${masterProxy.path}")
+
+    val mainThread = Thread.currentThread()
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      override def run(): Unit = {
+        if (!system.whenTerminated.isCompleted) {
+          LOG.info("Triggering shutdown hook....")
+
+          system.stop(masterProxy)
+          val cluster = Cluster(system)
+          cluster.leave(cluster.selfAddress)
+          cluster.down(cluster.selfAddress)
+          try {
+            Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
+          } catch {
+            case ex: Exception => // Ignore
+          }
+          system.terminate()
+          mainThread.join()
+        }
+      }
+    })
+
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}
+
+class MasterWatcher(role: String) extends Actor with ActorLogging {
+  import context.dispatcher
+
+  val cluster = Cluster(context.system)
+
+  val config = context.system.settings.config
+  val masters = config.getList("akka.cluster.seed-nodes")
+  val quorum = masters.size() / 2 + 1
+
+  val system = context.system
+
+  // Sorts by age, oldest first
+  val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) 
}
+  var membersByAge: immutable.SortedSet[Member] = 
immutable.SortedSet.empty(ageOrdering)
+
+  def receive: Receive = null
+
+  // Subscribes to MemberEvent, re-subscribe when restart
+  override def preStart(): Unit = {
+    cluster.subscribe(self, classOf[MemberEvent])
+    context.become(waitForInit)
+  }
+  override def postStop(): Unit = {
+    cluster.unsubscribe(self)
+  }
+
+  def matchingRole(member: Member): Boolean = member.hasRole(role)
+
+  def waitForInit: Receive = {
+    case state: CurrentClusterState => {
+      membersByAge = immutable.SortedSet.empty(ageOrdering) ++ 
state.members.filter(m =>
+        m.status == MemberStatus.Up && matchingRole(m))
+
+      if (membersByAge.size < quorum) {
+        membersByAge.iterator.mkString(",")
+        log.info(s"We cannot get a quorum, $quorum, " +
+          s"shutting down...${membersByAge.iterator.mkString(",")}")
+        context.become(waitForShutdown)
+        self ! MasterWatcher.Shutdown
+      } else {
+        val master = context.actorOf(Props(classOf[MasterActor]), MASTER)
+        notifyMasterMembersChange(master)
+        context.become(waitForClusterEvent(master))
+      }
+    }
+  }
+
+  def waitForClusterEvent(master: ActorRef): Receive = {
+    case MemberUp(m) if matchingRole(m) => {
+      membersByAge += m
+      notifyMasterMembersChange(master)
+    }
+    case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] ||
+      mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => {
+      log.info(s"member removed ${mEvent.member}")
+      val m = mEvent.member
+      membersByAge -= m
+      if (membersByAge.size < quorum) {
+        log.info(s"We cannot get a quorum, $quorum, " +
+          s"shutting down...${membersByAge.iterator.mkString(",")}")
+        context.become(waitForShutdown)
+        self ! MasterWatcher.Shutdown
+      } else {
+        notifyMasterMembersChange(master)
+      }
+    }
+  }
+
+  private def notifyMasterMembersChange(master: ActorRef): Unit = {
+    val masters = membersByAge.toList.map{ member =>
+      MasterNode(member.address.host.getOrElse("Unknown-Host"),
+        member.address.port.getOrElse(0))
+    }
+    master ! MasterListUpdated(masters)
+  }
+
+  def waitForShutdown: Receive = {
+    case MasterWatcher.Shutdown => {
+      cluster.unsubscribe(self)
+      cluster.leave(cluster.selfAddress)
+      context.stop(self)
+      system.scheduler.scheduleOnce(Duration.Zero) {
+        try {
+          Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
+        } catch {
+          case ex: Exception => // Ignore
+        }
+        system.terminate()
+      }
+    }
+  }
+}
+
+object MasterWatcher {
+  object Shutdown
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
new file mode 100644
index 0000000..3d8d823
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import akka.actor.{ActorSystem, Props}
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+/** Tool to start a worker daemon process */
+object Worker extends AkkaApp with ArgumentsParser {
+  protected override def akkaConfig = ClusterConfig.worker()
+
+  override val description = "Start a worker daemon"
+
+  var LOG: Logger = LogUtil.getLogger(getClass)
+
+  private def uuid = java.util.UUID.randomUUID.toString
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+    val id = uuid
+
+    this.LOG = {
+      LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER)
+      // Delay creation of LOG instance to avoid creating an empty log file as 
we
+      // reset the log file name here
+      LogUtil.getLogger(getClass)
+    }
+
+    val system = ActorSystem(id, akkaConf)
+
+    val masterAddress = 
akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address =>
+      val hostAndPort = address.split(":")
+      HostPort(hostAndPort(0), hostAndPort(1).toInt)
+    }
+
+    LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + 
"...")
+    val masterProxy = system.actorOf(MasterProxy.props(masterAddress), 
s"masterproxy${system.name}")
+
+    system.actorOf(Props(classOf[WorkerActor], masterProxy),
+      classOf[WorkerActor].getSimpleName + id)
+
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
new file mode 100644
index 0000000..0ae7365
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import akka.actor._
+import akka.pattern.ask
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, 
SaveAppDataFailed, _}
+import org.apache.gearpump.cluster.AppMasterToWorker._
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, 
AppMasterDataRequest, AppMastersDataRequest, _}
+import org.apache.gearpump.cluster.MasterToClient._
+import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, 
_}
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, 
ApplicationState}
+import org.apache.gearpump.cluster.master.AppManager._
+import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, 
PutKVResult, PutKVSuccess, _}
+import org.apache.gearpump.cluster.master.Master._
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _}
+import org.slf4j.Logger
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
+
+/**
+ * AppManager is dedicated child of Master to manager all applications.
+ */
+private[cluster] class AppManager(kvService: ActorRef, launcher: 
AppMasterLauncherFactory)
+  extends Actor with Stash with TimeOutScheduler {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  private val EXECUTOR_ID: Int = APPMASTER_DEFAULT_EXECUTOR_ID
+  private val appMasterMaxRetries: Int = 5
+  private val appMasterRetryTimeRange: Duration = 20.seconds
+
+  implicit val timeout = FUTURE_TIMEOUT
+  implicit val executionContext = context.dispatcher
+
+  // Next available appId
+  private var nextAppId: Int = 1
+
+  // From appId to appMaster data
+  // Applications not in activeAppMasters or deadAppMasters are in pending 
status
+  private var appMasterRegistry = Map.empty[Int, (ActorRef, 
AppMasterRuntimeInfo)]
+
+  // Active appMaster list where applications are in active status
+  private var activeAppMasters = Set.empty[Int]
+
+  // Dead appMaster list where applications are in inactive status
+  private var deadAppMasters = Set.empty[Int]
+
+  private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
+
+  def receive: Receive = null
+
+  kvService ! GetKV(MASTER_GROUP, MASTER_STATE)
+  context.become(waitForMasterState)
+
+  def waitForMasterState: Receive = {
+    case GetKVSuccess(_, result) =>
+      val masterState = result.asInstanceOf[MasterState]
+      if (masterState != null) {
+        this.nextAppId = masterState.maxId + 1
+        this.activeAppMasters = masterState.activeAppMasters
+        this.deadAppMasters = masterState.deadAppMasters
+        this.appMasterRegistry = masterState.appMasterRegistry
+      }
+      context.become(receiveHandler)
+      unstashAll()
+    case GetKVFailed(ex) =>
+      LOG.error("Failed to get master state, shutting down master to avoid 
data corruption...")
+      context.parent ! PoisonPill
+    case msg =>
+      LOG.info(s"Get message ${msg.getClass.getSimpleName}")
+      stash()
+  }
+
+  def receiveHandler: Receive = {
+    val msg = "Application Manager started. Ready for application 
submission..."
+    LOG.info(msg)
+    clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse 
workerMessage orElse
+      appDataStoreService orElse terminationWatch
+  }
+
+  def clientMsgHandler: Receive = {
+    case SubmitApplication(app, jar, username) =>
+      LOG.info(s"Submit Application ${app.name}($nextAppId) by $username...")
+      val client = sender()
+      if (applicationNameExist(app.name)) {
+        client ! SubmitApplicationResult(Failure(
+          new Exception(s"Application name ${app.name} already existed")))
+      } else {
+        context.actorOf(launcher.props(nextAppId, EXECUTOR_ID, app, jar, 
username, context.parent,
+          Some(client)), s"launcher${nextAppId}_${Util.randInt()}")
+
+        val appState = new ApplicationState(nextAppId, app.name, 0, app, jar, 
username, null)
+        appMasterRestartPolicies += nextAppId ->
+          new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
+        kvService ! PutKV(nextAppId.toString, APP_STATE, appState)
+        nextAppId += 1
+      }
+
+    case RestartApplication(appId) =>
+      val client = sender()
+      (kvService ? GetKV(appId.toString, 
APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
+        case GetKVSuccess(_, result) =>
+          val appState = result.asInstanceOf[ApplicationState]
+          if (appState != null) {
+            LOG.info(s"Shutting down the application (restart), $appId")
+            self ! ShutdownApplication(appId)
+            self.tell(SubmitApplication(appState.app, appState.jar, 
appState.username), client)
+          } else {
+            client ! SubmitApplicationResult(Failure(
+              new Exception(s"Failed to restart, because the application 
$appId does not exist.")
+            ))
+          }
+        case GetKVFailed(ex) =>
+          client ! SubmitApplicationResult(Failure(
+            new Exception(s"Unable to obtain the Master State. " +
+              s"Application $appId will not be restarted.")
+          ))
+      }
+
+    case ShutdownApplication(appId) =>
+      LOG.info(s"App Manager Shutting down application $appId")
+      val (_, appInfo) = appMasterRegistry.get(appId)
+        .filter { case (_, info) => !deadAppMasters.contains(info.appId)}
+        .getOrElse((null, null))
+      Option(appInfo) match {
+        case Some(info) =>
+          val worker = info.worker
+          val workerPath = Option(worker).map(_.path).orNull
+          LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, 
executorId: $EXECUTOR_ID")
+          cleanApplicationData(appId)
+          val shutdown = ShutdownExecutor(appId, EXECUTOR_ID,
+            s"AppMaster $appId shutdown requested by master...")
+          sendMsgWithTimeOutCallBack(worker, shutdown, 30000, 
shutDownExecutorTimeOut())
+          sender ! ShutdownApplicationResult(Success(appId))
+        case None =>
+          val errorMsg = s"Failed to find registration information for appId: 
$appId"
+          LOG.error(errorMsg)
+          sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg)))
+      }
+
+    case ResolveAppId(appId) =>
+      val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null))
+      if (null != appMaster) {
+        sender ! ResolveAppIdResult(Success(appMaster))
+      } else {
+        sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find 
Application: $appId")))
+      }
+
+    case AppMastersDataRequest =>
+      var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
+      appMasterRegistry.foreach(pair => {
+        val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
+        val appMasterPath = ActorUtil.getFullPath(context.system, 
appMaster.path)
+        val workerPath = Option(info.worker).map(worker =>
+          ActorUtil.getFullPath(context.system, worker.path))
+        val status = getAppMasterStatus(id)
+        appMastersData += AppMasterData(
+          status, id, info.appName, appMasterPath, workerPath.orNull,
+          info.submissionTime, info.startTime, info.finishTime, info.user)
+      })
+
+      sender ! AppMastersData(appMastersData.toList)
+
+    case QueryAppMasterConfig(appId) =>
+      val config =
+        if (appMasterRegistry.contains(appId)) {
+          val (_, info) = appMasterRegistry(appId)
+          info.config
+        } else {
+          null
+        }
+      sender ! AppMasterConfig(config)
+
+    case appMasterDataRequest: AppMasterDataRequest =>
+      val appId = appMasterDataRequest.appId
+      val appStatus = getAppMasterStatus(appId)
+
+      appStatus match {
+        case AppMasterNonExist =>
+          sender ! AppMasterData(AppMasterNonExist)
+        case _ =>
+          val (appMaster, info) = appMasterRegistry(appId)
+          val appMasterPath = ActorUtil.getFullPath(context.system, 
appMaster.path)
+          val workerPath = Option(info.worker).map(
+            worker => ActorUtil.getFullPath(context.system, 
worker.path)).orNull
+          sender ! AppMasterData(
+            appStatus, appId, info.appName, appMasterPath, workerPath,
+            info.submissionTime, info.startTime, info.finishTime, info.user)
+      }
+  }
+
+  def workerMessage: Receive = {
+    case ShutdownExecutorSucceed(appId, executorId) =>
+      LOG.info(s"Shut down executor $executorId for application $appId 
successfully")
+    case failed: ShutdownExecutorFailed =>
+      LOG.error(failed.reason)
+  }
+
+  private def getAppMasterStatus(appId: Int): AppMasterStatus = {
+    if (activeAppMasters.contains(appId)) {
+      AppMasterActive
+    } else if (deadAppMasters.contains(appId)) {
+      AppMasterInActive
+    } else if (appMasterRegistry.contains(appId)) {
+      AppMasterPending
+    } else {
+      AppMasterNonExist
+    }
+  }
+
+  private def shutDownExecutorTimeOut(): Unit = {
+    LOG.error(s"Shut down executor time out")
+  }
+
+  def appMasterMessage: Receive = {
+    case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) =>
+      val startTime = System.currentTimeMillis()
+      val register = registerBack.copy(startTime = startTime)
+
+      LOG.info(s"Register AppMaster for app: ${register.appId}, $register")
+      context.watch(appMaster)
+      appMasterRegistry += register.appId -> (appMaster, register)
+      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+        MasterState(nextAppId, appMasterRegistry, activeAppMasters, 
deadAppMasters))
+      sender ! AppMasterRegistered(register.appId)
+
+    case ActivateAppMaster(appId) =>
+      LOG.info(s"Activate AppMaster for app $appId")
+      activeAppMasters += appId
+      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+        MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, 
deadAppMasters))
+      sender ! AppMasterActivated(appId)
+  }
+
+  def appDataStoreService: Receive = {
+    case SaveAppData(appId, key, value) =>
+      val client = sender()
+      (kvService ? PutKV(appId.toString, key, 
value)).asInstanceOf[Future[PutKVResult]].map {
+        case PutKVSuccess =>
+          client ! AppDataSaved
+        case PutKVFailed(k, ex) =>
+          client ! SaveAppDataFailed
+      }
+    case GetAppData(appId, key) =>
+      val client = sender()
+      (kvService ? GetKV(appId.toString, 
key)).asInstanceOf[Future[GetKVResult]].map {
+        case GetKVSuccess(privateKey, value) =>
+          client ! GetAppDataResult(key, value)
+        case GetKVFailed(ex) =>
+          client ! GetAppDataResult(key, null)
+      }
+  }
+
+  def terminationWatch: Receive = {
+    case terminate: Terminated =>
+      LOG.info(s"AppMaster(${terminate.actor.path}) is terminated, " +
+        s"network down: ${terminate.getAddressTerminated}")
+
+      // Now we assume that the only normal way to stop the application is 
submitting a
+      // ShutdownApplication request
+      val application = appMasterRegistry.find { appInfo =>
+        val (_, (actorRef, _)) = appInfo
+        actorRef.compareTo(terminate.actor) == 0
+      }
+      if (application.nonEmpty) {
+        val appId = application.get._1
+        (kvService ? GetKV(appId.toString, 
APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
+          case GetKVSuccess(_, result) =>
+            val appState = result.asInstanceOf[ApplicationState]
+            if (appState != null) {
+              LOG.info(s"Recovering application, $appId")
+              self ! RecoverApplication(appState)
+            } else {
+              LOG.error(s"Cannot find application state for $appId")
+            }
+          case GetKVFailed(ex) =>
+            LOG.error(s"Cannot find master state to recover")
+        }
+      }
+  }
+
+  def selfMsgHandler: Receive = {
+    case RecoverApplication(state) =>
+      val appId = state.appId
+      if (appMasterRestartPolicies.get(appId).get.allowRestart) {
+        LOG.info(s"AppManager Recovering Application $appId...")
+        activeAppMasters -= appId
+        kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+          MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, 
deadAppMasters))
+        context.actorOf(launcher.props(appId, EXECUTOR_ID, state.app, 
state.jar, state.username,
+          context.parent, None), s"launcher${appId}_${Util.randInt()}")
+      } else {
+        LOG.error(s"Application $appId failed too many times")
+      }
+  }
+
+  case class RecoverApplication(applicationStatus: ApplicationState)
+
+  private def cleanApplicationData(appId: Int): Unit = {
+    if (appMasterRegistry.contains(appId)) {
+      // Add the dead app to dead appMasters
+      deadAppMasters += appId
+      // Remove the dead app from active appMasters
+      activeAppMasters -= appId
+
+      appMasterRegistry += appId -> {
+        val (ref, info) = appMasterRegistry(appId)
+        (ref, info.copy(finishTime = System.currentTimeMillis()))
+      }
+      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+        MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, 
deadAppMasters))
+      kvService ! DeleteKVGroup(appId.toString)
+    }
+  }
+
+  private def applicationNameExist(appName: String): Boolean = {
+    appMasterRegistry.values.exists { case (_, info) =>
+      info.appName == appName && !deadAppMasters.contains(info.appId)
+    }
+  }
+}
+
+object AppManager {
+  final val APP_STATE = "app_state"
+  // The id is used in KVStore
+  final val MASTER_STATE = "master_state"
+
+  case class MasterState(
+      maxId: Int,
+      appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)],
+      activeAppMasters: Set[Int],
+      deadAppMasters: Set[Int])
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
new file mode 100644
index 0000000..fd19bad
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import akka.cluster.Cluster
+import akka.cluster.ddata.{LWWMap, LWWMapKey, DistributedData}
+import akka.cluster.ddata.Replicator._
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration.Duration
+
+/**
+ * A replicated simple in-memory KV service. The replications are stored on 
all masters.
+ */
+class InMemoryKVService extends Actor with Stash {
+  import org.apache.gearpump.cluster.master.InMemoryKVService._
+
+  private val KV_SERVICE = "gearpump_kvservice"
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  private val replicator = DistributedData(context.system).replicator
+  private implicit val cluster = Cluster(context.system)
+
+  // Optimize write path, we can tolerate one master down for recovery.
+  private val timeout = Duration(15, TimeUnit.SECONDS)
+  private val readMajority = ReadMajority(timeout)
+  private val writeMajority = WriteMajority(timeout)
+
+  private def groupKey(group: String): LWWMapKey[Any] = {
+    LWWMapKey[Any](KV_SERVICE + "_" + group)
+  }
+
+  def receive: Receive = kvService
+
+  def kvService: Receive = {
+
+    case GetKV(group: String, key: String) =>
+      val request = Request(sender(), key)
+      replicator ! Get(groupKey(group), readMajority, Some(request))
+    case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: 
Request)) =>
+      val appData = success.get(group)
+      LOG.info(s"Successfully retrived group: ${group.id}")
+      request.client ! GetKVSuccess(request.key, 
appData.get(request.key).orNull)
+    case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+      LOG.info(s"We cannot find group $group")
+      request.client ! GetKVSuccess(request.key, null)
+    case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) 
=>
+      val error = s"Failed to get application data, the request key is 
${request.key}"
+      LOG.error(error)
+      request.client ! GetKVFailed(new Exception(error))
+
+    case PutKV(group: String, key: String, value: Any) =>
+      val request = Request(sender(), key)
+      val update = Update(groupKey(group), LWWMap(), writeMajority, 
Some(request)) { map =>
+        map + (key -> value)
+      }
+      replicator ! update
+    case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: 
Request)) =>
+      request.client ! PutKVSuccess
+    case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, 
Some(request: Request)) =>
+      request.client ! PutKVFailed(request.key, new Exception(error, cause))
+    case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: 
Request)) =>
+      request.client ! PutKVFailed(request.key, new TimeoutException())
+
+    case delete@DeleteKVGroup(group: String) =>
+      replicator ! Delete(groupKey(group), writeMajority)
+    case DeleteSuccess(group) =>
+      LOG.info(s"KV Group ${group.id} is deleted")
+    case ReplicationDeleteFailure(group) =>
+      LOG.error(s"Failed to delete KV Group ${group.id}...")
+    case DataDeleted(group) =>
+      LOG.error(s"Group ${group.id} is deleted, you can no longer 
put/get/delete this group...")
+  }
+}
+
+object InMemoryKVService {
+  /**
+   * KV Service related
+   */
+  case class GetKV(group: String, key: String)
+
+  trait GetKVResult
+
+  case class GetKVSuccess(key: String, value: Any) extends GetKVResult
+
+  case class GetKVFailed(ex: Throwable) extends GetKVResult
+
+  case class PutKV(group: String, key: String, value: Any)
+
+  case class DeleteKVGroup(group: String)
+
+  case class GroupDeleted(group: String) extends GetKVResult with PutKVResult
+
+  trait PutKVResult
+
+  case object PutKVSuccess extends PutKVResult
+
+  case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult
+
+  case class Request(client: ActorRef, key: String)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
new file mode 100644
index 0000000..6b4df07
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import java.lang.management.ManagementFactory
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.jarstore.JarStoreServer
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+
+import akka.actor._
+import akka.remote.DisassociatedEvent
+import com.typesafe.config.Config
+import org.apache.commons.lang.exception.ExceptionUtils
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToMaster._
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.MasterToAppMaster._
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, 
HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult}
+import org.apache.gearpump.cluster.MasterToWorker._
+import org.apache.gearpump.cluster.WorkerToMaster._
+import org.apache.gearpump.cluster.master.InMemoryKVService._
+import org.apache.gearpump.cluster.master.Master.{MasterInfo, 
WorkerTerminated, _}
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, 
MetricsReporterService}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+import org.apache.gearpump.util._
+
+/**
+ * Master Actor who manages resources of the whole cluster.
+ * It is like the resource manager of YARN.
+ */
+private[cluster] class Master extends Actor with Stash {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  private val systemConfig: Config = context.system.settings.config
+  private implicit val timeout = Constants.FUTURE_TIMEOUT
+  private val kvService = context.actorOf(Props(new InMemoryKVService()), 
"kvService")
+  // Resources and resourceRequests can be dynamically constructed by
+  // heartbeat of worker and appmaster when master singleton is migrated.
+  // We don't need to persist them in cluster
+  private var appManager: ActorRef = null
+
+  private var scheduler: ActorRef = null
+
+  private var workers = new immutable.HashMap[ActorRef, WorkerId]
+
+  private val birth = System.currentTimeMillis()
+
+  private var nextWorkerId = 0
+
+  def receive: Receive = null
+
+  // Register jvm metrics
+  Metrics(context.system).register(new JvmMetricsSet(s"master"))
+
+  LOG.info("master is started at " + ActorUtil.getFullPath(context.system, 
self.path) + "...")
+
+  val jarStoreRootPath = 
systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
+
+  private val jarStore = context.actorOf(Props(classOf[JarStoreServer], 
jarStoreRootPath))
+
+  private val hostPort = 
HostPort(ActorUtil.getSystemAddress(context.system).hostPort)
+
+  // Maintain the list of active masters.
+  private var masters: List[MasterNode] = {
+    // Add myself into the list of initial masters.
+    List(MasterNode(hostPort.host, hostPort.port))
+  }
+
+  val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+
+  val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
+  val historyMetricsService = if (metricsEnabled) {
+    val historyMetricsService = {
+      context.actorOf(Props(new HistoryMetricsService("master", 
getHistoryMetricsConfig)))
+    }
+
+    val metricsReportService = context.actorOf(
+      Props(new MetricsReporterService(Metrics(context.system))))
+    historyMetricsService.tell(ReportMetrics, metricsReportService)
+    Some(historyMetricsService)
+  } else {
+    None
+  }
+
+  kvService ! GetKV(MASTER_GROUP, WORKER_ID)
+  context.become(waitForNextWorkerId)
+
+  def waitForNextWorkerId: Receive = {
+    case GetKVSuccess(_, result) =>
+      if (result != null) {
+        this.nextWorkerId = result.asInstanceOf[Int]
+      } else {
+        LOG.warn("Cannot find existing state in the distributed cluster...")
+      }
+      context.become(receiveHandler)
+      unstashAll()
+    case GetKVFailed(ex) =>
+      LOG.error("Failed to get worker id, shutting down master to avoid data 
corruption...")
+      context.parent ! PoisonPill
+    case msg =>
+      LOG.info(s"Get message ${msg.getClass.getSimpleName}")
+      stash()
+  }
+
+  def receiveHandler: Receive = workerMsgHandler orElse
+    appMasterMsgHandler orElse
+    onMasterListChange orElse
+    clientMsgHandler orElse
+    metricsService orElse
+    jarStoreService orElse
+    terminationWatch orElse
+    disassociated orElse
+    kvServiceMsgHandler orElse
+    ActorUtil.defaultMsgHandler(self)
+
+  def workerMsgHandler: Receive = {
+    case RegisterNewWorker =>
+      val workerId = WorkerId(nextWorkerId, System.currentTimeMillis())
+      nextWorkerId += 1
+      kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId)
+      val workerHostname = ActorUtil.getHostname(sender())
+      LOG.info(s"Register new from $workerHostname ....")
+      self forward RegisterWorker(workerId)
+
+    case RegisterWorker(id) =>
+      context.watch(sender())
+      sender ! WorkerRegistered(id, MasterInfo(self, birth))
+      scheduler forward WorkerRegistered(id, MasterInfo(self, birth))
+      workers += (sender() -> id)
+      val workerHostname = ActorUtil.getHostname(sender())
+      LOG.info(s"Register Worker with id $id from $workerHostname ....")
+    case resourceUpdate: ResourceUpdate =>
+      scheduler forward resourceUpdate
+  }
+
+  def jarStoreService: Receive = {
+    case GetJarStoreServer =>
+      jarStore forward GetJarStoreServer
+  }
+
+  def kvServiceMsgHandler: Receive = {
+    case PutKVSuccess =>
+    // Skip
+    case PutKVFailed(key, exception) =>
+      LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" +
+        ExceptionUtils.getStackTrace(exception))
+  }
+
+  def metricsService: Receive = {
+    case query: QueryHistoryMetrics =>
+      if (historyMetricsService.isEmpty) {
+        // Returns empty metrics so that we don't hang the UI
+        sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
+      } else {
+        historyMetricsService.get forward query
+      }
+  }
+
+  def appMasterMsgHandler: Receive = {
+    case request: RequestResource =>
+      scheduler forward request
+    case registerAppMaster: RegisterAppMaster =>
+      appManager forward registerAppMaster
+    case activateAppMaster: ActivateAppMaster =>
+      appManager forward activateAppMaster
+    case save: SaveAppData =>
+      appManager forward save
+    case get: GetAppData =>
+      appManager forward get
+    case GetAllWorkers =>
+      sender ! WorkerList(workers.values.toList)
+    case GetMasterData =>
+      val aliveFor = System.currentTimeMillis() - birth
+      val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
+      val userDir = System.getProperty("user.dir")
+
+      val masterDescription =
+        MasterSummary(
+          MasterNode(hostPort.host, hostPort.port),
+          masters,
+          aliveFor,
+          logFileDir,
+          jarStoreRootPath,
+          MasterStatus.Synced,
+          userDir,
+          List.empty[MasterActivity],
+          jvmName = ManagementFactory.getRuntimeMXBean().getName(),
+          historyMetricsConfig = getHistoryMetricsConfig
+        )
+
+      sender ! MasterData(masterDescription)
+
+    case invalidAppMaster: InvalidAppMaster =>
+      appManager forward invalidAppMaster
+  }
+
+  import scala.util.{Failure, Success}
+
+  def onMasterListChange: Receive = {
+    case MasterListUpdated(masters: List[MasterNode]) =>
+      this.masters = masters
+  }
+
+  def clientMsgHandler: Receive = {
+    case app: SubmitApplication =>
+      LOG.debug(s"Receive from client, SubmitApplication $app")
+      appManager.forward(app)
+    case app: RestartApplication =>
+      LOG.debug(s"Receive from client, RestartApplication $app")
+      appManager.forward(app)
+    case app: ShutdownApplication =>
+      LOG.debug(s"Receive from client, Shutting down Application ${app.appId}")
+      scheduler ! ApplicationFinished(app.appId)
+      appManager.forward(app)
+    case app: ResolveAppId =>
+      LOG.debug(s"Receive from client, resolving appId ${app.appId} to 
ActorRef")
+      appManager.forward(app)
+    case resolve: ResolveWorkerId =>
+      LOG.debug(s"Receive from client, resolving workerId ${resolve.workerId}")
+      val worker = workers.find(_._2 == resolve.workerId)
+      worker match {
+        case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1))
+        case None => sender ! ResolveWorkerIdResult(Failure(
+          new Exception(s"cannot find worker ${resolve.workerId}")))
+      }
+    case AppMastersDataRequest =>
+      LOG.debug("Master received AppMastersDataRequest")
+      appManager forward AppMastersDataRequest
+    case appMasterDataRequest: AppMasterDataRequest =>
+      LOG.debug("Master received AppMasterDataRequest")
+      appManager forward appMasterDataRequest
+    case query: QueryAppMasterConfig =>
+      LOG.debug("Master received QueryAppMasterConfig")
+      appManager forward query
+    case QueryMasterConfig =>
+      sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+  }
+
+  def disassociated: Receive = {
+    case disassociated: DisassociatedEvent =>
+      LOG.info(s" disassociated ${disassociated.remoteAddress}")
+  }
+
+  def terminationWatch: Receive = {
+    case t: Terminated =>
+      val actor = t.actor
+      LOG.info(s"worker ${actor.path} get terminated, is it due to network 
reason?" +
+        t.getAddressTerminated())
+
+      LOG.info("Let's filter out dead resources...")
+      // Filters out dead worker resource
+      if (workers.keySet.contains(actor)) {
+        scheduler ! WorkerTerminated(workers.get(actor).get)
+        workers -= actor
+      }
+  }
+
+  override def preStart(): Unit = {
+    val path = ActorUtil.getFullPath(context.system, self.path)
+    LOG.info(s"master path is $path")
+    val schedulerClass = Class.forName(
+      systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER))
+
+    appManager = context.actorOf(Props(new AppManager(kvService, 
AppMasterLauncher)),
+      classOf[AppManager].getSimpleName)
+    scheduler = context.actorOf(Props(schedulerClass))
+    context.system.eventStream.subscribe(self, classOf[DisassociatedEvent])
+  }
+}
+
+object Master {
+  final val MASTER_GROUP = "master_group"
+
+  final val WORKER_ID = "next_worker_id"
+
+  case class WorkerTerminated(workerId: WorkerId)
+
+  case class MasterInfo(master: ActorRef, startTime: Long = 0L)
+
+  /** Notify the subscriber that master actor list has been updated */
+  case class MasterListUpdated(masters: List[MasterNode])
+
+  object MasterInfo {
+    def empty: MasterInfo = MasterInfo(null)
+  }
+
+  case class SlotStatus(totalSlots: Int, availableSlots: Int)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
new file mode 100644
index 0000000..623e3ff
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.ActorRef
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.scheduler.Relaxation._
+import org.apache.gearpump.cluster.scheduler.Scheduler.PendingRequest
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.collection.mutable
+
+/** Assign resource to application based on the priority of the application */
+class PriorityScheduler extends Scheduler {
+  private var resourceRequests = new 
mutable.PriorityQueue[PendingRequest]()(requestOrdering)
+
+  def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] 
{
+    override def compare(x: PendingRequest, y: PendingRequest): Int = {
+      var res = x.request.priority.id - y.request.priority.id
+      if (res == 0) {
+        res = y.timeStamp.compareTo(x.timeStamp)
+      }
+      res
+    }
+  }
+
+  override def receive: Receive = super.handleScheduleMessage orElse 
resourceRequestHandler
+
+  override def allocateResource(): Unit = {
+    var scheduleLater = Array.empty[PendingRequest]
+    val resourcesSnapShot = resources.clone()
+    var allocated = Resource.empty
+    val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum)
+
+    while (resourceRequests.nonEmpty && (allocated < totalResource)) {
+      val PendingRequest(appId, appMaster, request, timeStamp) = 
resourceRequests.dequeue()
+      request.relaxation match {
+        case ANY =>
+          val allocations = allocateFairly(resourcesSnapShot, request)
+          val newAllocated = Resource(allocations.map(_.resource.slots).sum)
+          if (allocations.nonEmpty) {
+            appMaster ! ResourceAllocated(allocations.toArray)
+          }
+          if (newAllocated < request.resource) {
+            val remainingRequest = request.resource - newAllocated
+            val remainingExecutors = request.executorNum - allocations.length
+            val newResourceRequest = request.copy(resource = remainingRequest,
+              executorNum = remainingExecutors)
+            scheduleLater = scheduleLater :+
+              PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
+          }
+          allocated = allocated + newAllocated
+        case ONEWORKER =>
+          val availableResource = resourcesSnapShot.find { params =>
+            val (_, (_, resource)) = params
+            resource > request.resource
+          }
+          if (availableResource.nonEmpty) {
+            val (workerId, (worker, resource)) = availableResource.get
+            allocated = allocated + request.resource
+            appMaster ! 
ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
+              workerId)))
+            resourcesSnapShot.update(workerId, (worker, resource - 
request.resource))
+          } else {
+            scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, 
request, timeStamp)
+          }
+        case SPECIFICWORKER =>
+          val workerAndResource = resourcesSnapShot.get(request.workerId)
+          if (workerAndResource.nonEmpty && workerAndResource.get._2 > 
request.resource) {
+            val (worker, availableResource) = workerAndResource.get
+            appMaster ! 
ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
+              request.workerId)))
+            allocated = allocated + request.resource
+            resourcesSnapShot.update(request.workerId, (worker,
+              availableResource - request.resource))
+          } else {
+            scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, 
request, timeStamp)
+          }
+      }
+    }
+    for (request <- scheduleLater)
+      resourceRequests.enqueue(request)
+  }
+
+  def resourceRequestHandler: Receive = {
+    case RequestResource(appId, request) =>
+      LOG.info(s"Request resource: appId: $appId, slots: 
${request.resource.slots}, " +
+        s"relaxation: ${request.relaxation}, executor number: 
${request.executorNum}")
+      val appMaster = sender()
+      resourceRequests.enqueue(new PendingRequest(appId, appMaster, request,
+        System.currentTimeMillis()))
+      allocateResource()
+  }
+
+  override def doneApplication(appId: Int): Unit = {
+    resourceRequests = resourceRequests.filter(_.appId != appId)
+  }
+
+  private def allocateFairly(
+      resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: 
ResourceRequest)
+    : List[ResourceAllocation] = {
+    val workerNum = resources.size
+    var allocations = List.empty[ResourceAllocation]
+    var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
+    var remainingRequest = request.resource
+    var remainingExecutors = Math.min(request.executorNum, 
request.resource.slots)
+
+    while (remainingExecutors > 0 && !totalAvailable.isEmpty) {
+      val exeutorNum = Math.min(workerNum, remainingExecutors)
+      val toRequest = Resource(remainingRequest.slots * exeutorNum / 
remainingExecutors)
+
+      val sortedResources = 
resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse)
+      val pickedResources = sortedResources.take(exeutorNum)
+
+      val flattenResource = pickedResources.zipWithIndex.flatMap { 
workerWithIndex =>
+        val ((workerId, (worker, resource)), index) = workerWithIndex
+        0.until(resource.slots).map(seq => ((workerId, worker), seq * 
workerNum + index))
+      }.sortBy(_._2).map(_._1)
+
+      if (flattenResource.length < toRequest.slots) {
+        // Can not safisfy the user's requirements
+        totalAvailable = Resource.empty
+      } else {
+        flattenResource.take(toRequest.slots).groupBy(actor => 
actor).mapValues(_.length).
+          toArray.foreach { params =>
+          val ((workerId, worker), slots) = params
+          resources.update(workerId, (worker, resources.get(workerId).get._2 - 
Resource(slots)))
+          allocations :+= ResourceAllocation(Resource(slots), worker, workerId)
+        }
+        totalAvailable -= toRequest
+        remainingRequest -= toRequest
+        remainingExecutors -= exeutorNum
+      }
+    }
+    allocations
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
new file mode 100644
index 0000000..ec9f1ba
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.{Actor, ActorRef}
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, 
UpdateResourceSucceed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
+import org.apache.gearpump.cluster.master.Master.WorkerTerminated
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+import scala.collection.mutable
+
+/**
+ * Scheduler schedule resource for different applications.
+ */
+abstract class Scheduler extends Actor {
+  val LOG: Logger = LogUtil.getLogger(getClass)
+  protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)]
+
+  def handleScheduleMessage: Receive = {
+    case WorkerRegistered(id, _) =>
+      if (!resources.contains(id)) {
+        LOG.info(s"Worker $id added to the scheduler")
+        resources.put(id, (sender, Resource.empty))
+      }
+    case update@ResourceUpdate(worker, workerId, resource) =>
+      LOG.info(s"$update...")
+      if (resources.contains(workerId)) {
+        val resourceReturned = resource > resources.get(workerId).get._2
+        resources.update(workerId, (worker, resource))
+        if (resourceReturned) {
+          allocateResource()
+        }
+        sender ! UpdateResourceSucceed
+      }
+      else {
+        sender ! UpdateResourceFailed(
+          s"ResourceUpdate failed! The worker $workerId has not been 
registered into master")
+      }
+    case WorkerTerminated(workerId) =>
+      if (resources.contains(workerId)) {
+        resources -= workerId
+      }
+    case ApplicationFinished(appId) =>
+      doneApplication(appId)
+  }
+
+  def allocateResource(): Unit
+
+  def doneApplication(appId: Int): Unit
+}
+
+object Scheduler {
+  case class PendingRequest(
+      appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: 
TimeStamp)
+
+  case class ApplicationFinished(appId: Int)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
new file mode 100644
index 0000000..3d5b0af
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import java.io.File
+
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.util.{LogUtil, RichProcess, Util}
+import org.slf4j.Logger
+
+/** Launcher to start an executor process */
+class DefaultExecutorProcessLauncher(val config: Config) extends 
ExecutorProcessLauncher {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override def createProcess(
+      appId: Int, executorId: Int, resource: Resource, config: Config, 
options: Array[String],
+      classPath: Array[String], mainClass: String, arguments: Array[String]): 
RichProcess = {
+
+    LOG.info(s"Launch executor $executorId, classpath: 
${classPath.mkString(File.pathSeparator)}")
+    Util.startProcess(options, classPath, mainClass, arguments)
+  }
+
+  override def cleanProcess(appId: Int, executorId: Int): Unit = {}
+}

Reply via email to