http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala 
b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
deleted file mode 100644
index 68f778e..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.main
-
-import java.io.File
-import java.net.{URL, URLClassLoader}
-import java.util.jar.JarFile
-import scala.util.Try
-
-import org.slf4j.Logger
-
-import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
-
-/** Tool to submit an application jar to cluster */
-object AppSubmitter extends AkkaApp with ArgumentsParser {
-  val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val ignoreUnknownArgument = true
-
-  override val description = "Submit an application to Master by providing a 
jar"
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "namePrefix" -> CLIOption[String]("<application name prefix>", required = 
false,
-      defaultValue = Some("")),
-    "jar" -> CLIOption("<application>.jar", required = true),
-    "executors" -> CLIOption[Int]("number of executor to launch", required = 
false,
-      defaultValue = Some(1)),
-    "verbose" -> CLIOption("<print verbose log on console>", required = false,
-      defaultValue = Some(false)),
-    // For document purpose only, OPTION_CONFIG option is not used here.
-    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = 
false,
-      defaultValue = None))
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-
-    val config = parse(args)
-    if (null != config) {
-
-      val verbose = config.getBoolean("verbose")
-      if (verbose) {
-        LogUtil.verboseLogToConsole()
-      }
-
-      val jar = config.getString("jar")
-
-      // Set jar path to be submitted to cluster
-      System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
-      System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, 
config.getInt("executors").toString)
-
-      val namePrefix = config.getString("namePrefix")
-      if (namePrefix.nonEmpty) {
-        if (!Util.validApplicationName(namePrefix)) {
-          throw new Exception(s"$namePrefix is not a valid prefix for an 
application name")
-        }
-        System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix)
-      }
-
-      val jarFile = new java.io.File(jar)
-
-      // Start main class
-      if (!jarFile.exists()) {
-        throw new Exception(s"jar $jar does not exist")
-      }
-
-      val classLoader: URLClassLoader = new URLClassLoader(Array(new 
URL("file:" +
-        jarFile.getAbsolutePath)), 
Thread.currentThread().getContextClassLoader())
-      val (main, arguments) = parseMain(jarFile, config.remainArgs, 
classLoader)
-
-      // Set to context classloader. ActorSystem pick context classloader in 
preference
-      Thread.currentThread().setContextClassLoader(classLoader)
-      val clazz = classLoader.loadClass(main)
-      val mainMethod = clazz.getMethod("main", classOf[Array[String]])
-      mainMethod.invoke(null, arguments)
-    }
-  }
-
-  private def parseMain(jar: File, remainArgs: Array[String], classLoader: 
ClassLoader)
-    : (String, Array[String]) = {
-    val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes.
-      getValue("Main-Class")).getOrElse("")
-
-    if (remainArgs.length > 0 && 
Try(classLoader.loadClass(remainArgs(0))).isSuccess) {
-      (remainArgs(0), remainArgs.drop(1))
-    } else if (mainInManifest.nonEmpty) {
-      (mainInManifest, remainArgs)
-    } else {
-      throw new Exception("No main class specified")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala 
b/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala
deleted file mode 100644
index 4423727..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import io.gearpump.util.{Constants, LogUtil}
-
-object Gear {
-
-  val OPTION_CONFIG = "conf"
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  val commands = Map("app" -> AppSubmitter, "kill" -> Kill,
-    "info" -> Info, "replay" -> Replay, "main" -> MainRunner)
-
-  def usage(): Unit = {
-    val keys = commands.keys.toList.sorted
-    // scalastyle:off println
-    Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
-    // scalastyle:on println
-  }
-
-  private def executeCommand(command: String, commandArgs: Array[String]) = {
-    commands.get(command).map(_.main(commandArgs))
-    if (!commands.contains(command)) {
-      val allArgs = (command +: commandArgs.toList).toArray
-      MainRunner.main(allArgs)
-    }
-  }
-
-  def main(inputArgs: Array[String]): Unit = {
-    val (configFile, args) = extractConfig(inputArgs)
-    if (configFile != null) {
-      // Sets custom config file...
-      System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile)
-    }
-
-    if (args.length == 0) {
-      usage()
-    } else {
-      val command = args(0)
-      val commandArgs = args.drop(1)
-      executeCommand(command, commandArgs)
-    }
-  }
-
-  private def extractConfig(inputArgs: Array[String]): (String, Array[String]) 
= {
-    var index = 0
-
-    var result = List.empty[String]
-    var configFile: String = null
-    while (index < inputArgs.length) {
-      val item = inputArgs(index)
-      if (item == s"-$OPTION_CONFIG") {
-        index += 1
-        configFile = inputArgs(index)
-      } else {
-        result = result :+ item
-      }
-      index += 1
-    }
-    (configFile, result.toArray)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala 
b/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala
deleted file mode 100644
index 4922690..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import io.gearpump.cluster.MasterToAppMaster.AppMastersData
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to query master info */
-object Info extends AkkaApp with ArgumentsParser {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    // For document purpose only, OPTION_CONFIG option is not used here.
-    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = 
false,
-      defaultValue = None))
-
-  override val description = "Query the Application list"
-
-  // scalastyle:off println
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-    val client = ClientContext(akkaConf)
-
-    val AppMastersData(appMasters) = client.listApps
-    Console.println("== Application Information ==")
-    Console.println("====================================")
-    appMasters.foreach { appData =>
-      Console.println(s"application: ${appData.appId}, name: 
${appData.appName}, " +
-        s"status: ${appData.status}, worker: ${appData.workerPath}")
-    }
-    client.close()
-  }
-  // scalastyle:on println
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala 
b/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala
deleted file mode 100644
index 3ce781f..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to kill an App */
-object Kill extends AkkaApp with ArgumentsParser {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "appid" -> CLIOption("<application id>", required = true),
-    // For document purpose only, OPTION_CONFIG option is not used here.
-    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = 
false,
-      defaultValue = None))
-
-  override val description = "Kill an application with application Id"
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-
-    if (null != config) {
-      val client = ClientContext(akkaConf)
-      LOG.info("Client ")
-      client.shutdown(config.getInt("appid"))
-      client.close()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala 
b/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
deleted file mode 100644
index d5681df..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.main
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, Props}
-import com.typesafe.config.ConfigValueFactory
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.master.{Master => MasterActor}
-import io.gearpump.cluster.worker.{Worker => WorkerActor}
-import io.gearpump.util.Constants._
-import io.gearpump.util.LogUtil.ProcessType
-import io.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util}
-
-object Local extends AkkaApp with ArgumentsParser {
-  override def akkaConfig: Config = ClusterConfig.master()
-
-  var LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] =
-    Array("sameprocess" -> CLIOption[Boolean]("", required = false, 
defaultValue = Some(false)),
-      "workernum" -> CLIOption[Int]("<how many workers to start>", required = 
false,
-        defaultValue = Some(2)))
-
-  override val description = "Start a local cluster"
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-
-    this.LOG = {
-      LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL)
-      LogUtil.getLogger(getClass)
-    }
-
-    val config = parse(args)
-    if (null != config) {
-      local(config.getInt("workernum"), config.getBoolean("sameprocess"), 
akkaConf)
-    }
-  }
-
-  def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = {
-    if (sameProcess) {
-      LOG.info("Starting local in same process")
-      System.setProperty("LOCAL", "true")
-    }
-    val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
-      .asScala.flatMap(Util.parseHostList)
-    val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
-
-    if (masters.size != 1 && masters.head.host != local) {
-      LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " +
-        s"with ${Constants.GEARPUMP_HOSTNAME}")
-    } else {
-
-      val hostPort = masters.head
-      implicit val system = ActorSystem(MASTER, akkaConf.
-        withValue("akka.remote.netty.tcp.port", 
ConfigValueFactory.fromAnyRef(hostPort.port))
-      )
-
-      val master = system.actorOf(Props[MasterActor], MASTER)
-      val masterPath = ActorUtil.getSystemAddress(system).toString + 
s"/user/$MASTER"
-
-      0.until(workerCount).foreach { id =>
-        system.actorOf(Props(classOf[WorkerActor], master), 
classOf[WorkerActor].getSimpleName + id)
-      }
-
-      Await.result(system.whenTerminated, Duration.Inf)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala 
b/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala
deleted file mode 100644
index 923a646..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import io.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to run any main class by providing a jar */
-object MainRunner extends AkkaApp with ArgumentsParser {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    // For document purpose only, OPTION_CONFIG option is not used here.
-    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = 
false,
-      defaultValue = None))
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-    val mainClazz = args(0)
-    val commandArgs = args.drop(1)
-
-    val clazz = 
Thread.currentThread().getContextClassLoader().loadClass(mainClazz)
-    val mainMethod = clazz.getMethod("main", classOf[Array[String]])
-    mainMethod.invoke(null, commandArgs)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala 
b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
deleted file mode 100644
index eac1c54..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.main
-
-import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.collection.immutable
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor._
-import akka.cluster.ClusterEvent._
-import akka.cluster.ddata.DistributedData
-import akka.cluster.singleton.{ClusterSingletonManager, 
ClusterSingletonManagerSettings, ClusterSingletonProxy, 
ClusterSingletonProxySettings}
-import akka.cluster.{Cluster, Member, MemberStatus}
-import com.typesafe.config.ConfigValueFactory
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.master.{Master => MasterActor, MasterNode}
-import io.gearpump.cluster.master.Master.MasterListUpdated
-import io.gearpump.util.Constants._
-import io.gearpump.util.LogUtil.ProcessType
-import io.gearpump.util.{AkkaApp, Constants, LogUtil}
-
-object Master extends AkkaApp with ArgumentsParser {
-
-  private var LOG: Logger = LogUtil.getLogger(getClass)
-
-  override def akkaConfig: Config = ClusterConfig.master()
-
-  override val options: Array[(String, CLIOption[Any])] =
-    Array("ip" -> CLIOption[String]("<master ip address>", required = true),
-      "port" -> CLIOption("<master port>", required = true))
-
-  override val description = "Start Master daemon"
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-
-    this.LOG = {
-      LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER)
-      LogUtil.getLogger(getClass)
-    }
-
-    val config = parse(args)
-    master(config.getString("ip"), config.getInt("port"), akkaConf)
-  }
-
-  private def verifyMaster(master: String, port: Int, masters: 
Iterable[String]) = {
-    masters.exists { hostPort =>
-      hostPort == s"$master:$port"
-    }
-  }
-
-  private def master(ip: String, port: Int, akkaConf: Config): Unit = {
-    val masters = 
akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
-
-    if (!verifyMaster(ip, port, masters)) {
-      LOG.error(s"The provided ip $ip and port $port doesn't conform with 
config at " +
-        s"gearpump.cluster.masters: ${masters.mkString(", ")}")
-      System.exit(-1)
-    }
-
-    val masterList = masters.map(master => 
s"akka.tcp://${MASTER}@$master").toList.asJava
-    val quorum = masterList.size() / 2 + 1
-    val masterConfig = akkaConf.
-      withValue("akka.remote.netty.tcp.port", 
ConfigValueFactory.fromAnyRef(port)).
-      withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)).
-      withValue("akka.cluster.seed-nodes", 
ConfigValueFactory.fromAnyRef(masterList)).
-      withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members",
-        ConfigValueFactory.fromAnyRef(quorum))
-
-    LOG.info(s"Starting Master Actor system $ip:$port, master list: 
${masters.mkString(";")}")
-    val system = ActorSystem(MASTER, masterConfig)
-
-    val replicator = DistributedData(system).replicator
-    LOG.info(s"Replicator path: ${replicator.path}")
-
-    // Starts singleton manager
-    val singletonManager = system.actorOf(ClusterSingletonManager.props(
-      singletonProps = Props(classOf[MasterWatcher], MASTER),
-      terminationMessage = PoisonPill,
-      settings = 
ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER)
-        .withRole(MASTER)),
-      name = SINGLETON_MANAGER)
-
-    // Start master proxy
-    val masterProxy = system.actorOf(ClusterSingletonProxy.props(
-      singletonManagerPath = s"/user/${SINGLETON_MANAGER}",
-      // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of 
s"${MASTER_WATCHER}".
-      // Master is created when there is a majority of machines started.
-      settings = ClusterSingletonProxySettings(system)
-        .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
-      name = MASTER
-    )
-
-    LOG.info(s"master proxy is started at ${masterProxy.path}")
-
-    val mainThread = Thread.currentThread()
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      override def run(): Unit = {
-        if (!system.whenTerminated.isCompleted) {
-          LOG.info("Triggering shutdown hook....")
-
-          system.stop(masterProxy)
-          val cluster = Cluster(system)
-          cluster.leave(cluster.selfAddress)
-          cluster.down(cluster.selfAddress)
-          try {
-            Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
-          } catch {
-            case ex: Exception => // Ignore
-          }
-          system.terminate()
-          mainThread.join()
-        }
-      }
-    })
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}
-
-class MasterWatcher(role: String) extends Actor with ActorLogging {
-  import context.dispatcher
-
-  val cluster = Cluster(context.system)
-
-  val config = context.system.settings.config
-  val masters = config.getList("akka.cluster.seed-nodes")
-  val quorum = masters.size() / 2 + 1
-
-  val system = context.system
-
-  // Sorts by age, oldest first
-  val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) 
}
-  var membersByAge: immutable.SortedSet[Member] = 
immutable.SortedSet.empty(ageOrdering)
-
-  def receive: Receive = null
-
-  // Subscribes to MemberEvent, re-subscribe when restart
-  override def preStart(): Unit = {
-    cluster.subscribe(self, classOf[MemberEvent])
-    context.become(waitForInit)
-  }
-  override def postStop(): Unit = {
-    cluster.unsubscribe(self)
-  }
-
-  def matchingRole(member: Member): Boolean = member.hasRole(role)
-
-  def waitForInit: Receive = {
-    case state: CurrentClusterState => {
-      membersByAge = immutable.SortedSet.empty(ageOrdering) ++ 
state.members.filter(m =>
-        m.status == MemberStatus.Up && matchingRole(m))
-
-      if (membersByAge.size < quorum) {
-        membersByAge.iterator.mkString(",")
-        log.info(s"We cannot get a quorum, $quorum, " +
-          s"shutting down...${membersByAge.iterator.mkString(",")}")
-        context.become(waitForShutdown)
-        self ! MasterWatcher.Shutdown
-      } else {
-        val master = context.actorOf(Props(classOf[MasterActor]), MASTER)
-        notifyMasterMembersChange(master)
-        context.become(waitForClusterEvent(master))
-      }
-    }
-  }
-
-  def waitForClusterEvent(master: ActorRef): Receive = {
-    case MemberUp(m) if matchingRole(m) => {
-      membersByAge += m
-      notifyMasterMembersChange(master)
-    }
-    case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] ||
-      mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => {
-      log.info(s"member removed ${mEvent.member}")
-      val m = mEvent.member
-      membersByAge -= m
-      if (membersByAge.size < quorum) {
-        log.info(s"We cannot get a quorum, $quorum, " +
-          s"shutting down...${membersByAge.iterator.mkString(",")}")
-        context.become(waitForShutdown)
-        self ! MasterWatcher.Shutdown
-      } else {
-        notifyMasterMembersChange(master)
-      }
-    }
-  }
-
-  private def notifyMasterMembersChange(master: ActorRef): Unit = {
-    val masters = membersByAge.toList.map{ member =>
-      MasterNode(member.address.host.getOrElse("Unknown-Host"),
-        member.address.port.getOrElse(0))
-    }
-    master ! MasterListUpdated(masters)
-  }
-
-  def waitForShutdown: Receive = {
-    case MasterWatcher.Shutdown => {
-      cluster.unsubscribe(self)
-      cluster.leave(cluster.selfAddress)
-      context.stop(self)
-      system.scheduler.scheduleOnce(Duration.Zero) {
-        try {
-          Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
-        } catch {
-          case ex: Exception => // Ignore
-        }
-        system.terminate()
-      }
-    }
-  }
-}
-
-object MasterWatcher {
-  object Shutdown
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala 
b/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala
deleted file mode 100644
index c9a6e9c..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.util.{AkkaApp, LogUtil}
-
-// Internal tool to restart an application
-object Replay extends AkkaApp with ArgumentsParser {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "appid" -> CLIOption("<application id>", required = true),
-    // For document purpose only, OPTION_CONFIG option is not used here.
-    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = 
false,
-      defaultValue = None))
-
-  override val description = "Replay the application from current min 
clock(low watermark)"
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-
-    if (null != config) {
-      val client = ClientContext(akkaConf)
-      client.replayFromTimestampWindowTrailingEdge(config.getInt("appid"))
-      client.close()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala 
b/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala
deleted file mode 100644
index 4818262..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.main
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, Props}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.cluster.worker.{Worker => WorkerActor}
-import io.gearpump.transport.HostPort
-import io.gearpump.util.Constants._
-import io.gearpump.util.LogUtil.ProcessType
-import io.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to start a worker daemon process */
-object Worker extends AkkaApp with ArgumentsParser {
-  protected override def akkaConfig = ClusterConfig.worker()
-
-  override val description = "Start a worker daemon"
-
-  var LOG: Logger = LogUtil.getLogger(getClass)
-
-  private def uuid = java.util.UUID.randomUUID.toString
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-    val id = uuid
-
-    this.LOG = {
-      LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER)
-      // Delay creation of LOG instance to avoid creating an empty log file as 
we
-      // reset the log file name here
-      LogUtil.getLogger(getClass)
-    }
-
-    val system = ActorSystem(id, akkaConf)
-
-    val masterAddress = 
akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address =>
-      val hostAndPort = address.split(":")
-      HostPort(hostAndPort(0), hostAndPort(1).toInt)
-    }
-
-    LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + 
"...")
-    val masterProxy = system.actorOf(MasterProxy.props(masterAddress), 
s"masterproxy${system.name}")
-
-    system.actorOf(Props(classOf[WorkerActor], masterProxy),
-      classOf[WorkerActor].getSimpleName + id)
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala 
b/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
deleted file mode 100644
index 058533e..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.util.{Failure, Success}
-
-import akka.actor._
-import akka.pattern.ask
-import org.slf4j.Logger
-
-import io.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, 
_}
-import io.gearpump.cluster.AppMasterToWorker._
-import io.gearpump.cluster.ClientToMaster._
-import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, 
AppMasterDataRequest, AppMastersDataRequest, _}
-import io.gearpump.cluster.MasterToClient._
-import io.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _}
-import io.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
-import io.gearpump.cluster.master.AppManager._
-import io.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, 
PutKVSuccess, _}
-import io.gearpump.cluster.master.Master._
-import io.gearpump.util.Constants._
-import io.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _}
-
-/**
- * AppManager is dedicated child of Master to manager all applications.
- */
-private[cluster] class AppManager(kvService: ActorRef, launcher: 
AppMasterLauncherFactory)
-  extends Actor with Stash with TimeOutScheduler {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  private val executorId: Int = APPMASTER_DEFAULT_EXECUTOR_ID
-  private val appMasterMaxRetries: Int = 5
-  private val appMasterRetryTimeRange: Duration = 20.seconds
-
-  implicit val timeout = FUTURE_TIMEOUT
-  implicit val executionContext = context.dispatcher
-
-  // Next available appId
-  private var appId: Int = 1
-
-  // From appid to appMaster data
-  private var appMasterRegistry = Map.empty[Int, (ActorRef, 
AppMasterRuntimeInfo)]
-
-  // Dead appmaster list
-  private var deadAppMasters = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
-
-  private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
-
-  def receive: Receive = null
-
-  kvService ! GetKV(MASTER_GROUP, MASTER_STATE)
-  context.become(waitForMasterState)
-
-  def waitForMasterState: Receive = {
-    case GetKVSuccess(_, result) =>
-      val masterState = result.asInstanceOf[MasterState]
-      if (masterState != null) {
-        this.appId = masterState.maxId + 1
-        this.deadAppMasters = masterState.deadAppMasters
-        this.appMasterRegistry = masterState.appMasterRegistry
-      }
-      context.become(receiveHandler)
-      unstashAll()
-    case GetKVFailed(ex) =>
-      LOG.error("Failed to get master state, shutting down master to avoid 
data corruption...")
-      context.parent ! PoisonPill
-    case msg =>
-      LOG.info(s"Get message ${msg.getClass.getSimpleName}")
-      stash()
-  }
-
-  def receiveHandler: Receive = {
-    val msg = "Application Manager started. Ready for application 
submission..."
-    LOG.info(msg)
-    clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse 
workerMessage orElse
-      appDataStoreService orElse terminationWatch
-  }
-
-  def clientMsgHandler: Receive = {
-    case SubmitApplication(app, jar, username) =>
-      LOG.info(s"Submit Application ${app.name}($appId) by $username...")
-      val client = sender
-      if (applicationNameExist(app.name)) {
-        client ! SubmitApplicationResult(Failure(
-          new Exception(s"Application name ${app.name} already existed")))
-      } else {
-        context.actorOf(launcher.props(appId, executorId, app, jar, username, 
context.parent,
-          Some(client)), s"launcher${appId}_${Util.randInt()}")
-
-        val appState = new ApplicationState(appId, app.name, 0, app, jar, 
username, null)
-        appMasterRestartPolicies += appId ->
-          new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
-        kvService ! PutKV(appId.toString, APP_STATE, appState)
-        appId += 1
-      }
-
-    case RestartApplication(appId) =>
-      val client = sender()
-      (kvService ? GetKV(appId.toString, 
APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
-        case GetKVSuccess(_, result) =>
-          val appState = result.asInstanceOf[ApplicationState]
-          if (appState != null) {
-            LOG.info(s"Shutting down the application (restart), $appId")
-            self ! ShutdownApplication(appId)
-            self.tell(SubmitApplication(appState.app, appState.jar, 
appState.username), client)
-          } else {
-            client ! SubmitApplicationResult(Failure(
-              new Exception(s"Failed to restart, because the application 
$appId does not exist.")
-            ))
-          }
-        case GetKVFailed(ex) =>
-          client ! SubmitApplicationResult(Failure(
-            new Exception(s"Unable to obtain the Master State. " +
-              s"Application $appId will not be restarted.")
-          ))
-      }
-
-    case ShutdownApplication(appId) =>
-      LOG.info(s"App Manager Shutting down application $appId")
-      val (_, info) = appMasterRegistry.getOrElse(appId, (null, null))
-      Option(info) match {
-        case Some(info) =>
-          val worker = info.worker
-          val workerPath = Option(worker).map(_.path).orNull
-          LOG.info(s"Shutdown AppMaster at ${workerPath}, appId: $appId, 
executorId: $executorId")
-          cleanApplicationData(appId)
-          val shutdown = ShutdownExecutor(appId, executorId,
-            s"AppMaster $appId shutdown requested by master...")
-          sendMsgWithTimeOutCallBack(worker, shutdown, 30000, 
shutDownExecutorTimeOut())
-          sender ! ShutdownApplicationResult(Success(appId))
-        case None =>
-          val errorMsg = s"Failed to find regisration information for appId: 
$appId"
-          LOG.error(errorMsg)
-          sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg)))
-      }
-    case ResolveAppId(appId) =>
-      val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null))
-      if (null != appMaster) {
-        sender ! ResolveAppIdResult(Success(appMaster))
-      } else {
-        sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find 
Application: $appId")))
-      }
-    case AppMastersDataRequest =>
-      var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
-      appMasterRegistry.foreach(pair => {
-        val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
-        val appMasterPath = ActorUtil.getFullPath(context.system, 
appMaster.path)
-        val workerPath = Option(info.worker).map(worker =>
-          ActorUtil.getFullPath(context.system, worker.path))
-        appMastersData += AppMasterData(
-          AppMasterActive, id, info.appName, appMasterPath, workerPath.orNull,
-          info.submissionTime, info.startTime, info.finishTime, info.user)
-      })
-
-      deadAppMasters.foreach(pair => {
-        val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
-        val appMasterPath = ActorUtil.getFullPath(context.system, 
appMaster.path)
-        val workerPath = Option(info.worker).map(worker =>
-          ActorUtil.getFullPath(context.system, worker.path))
-
-        appMastersData += AppMasterData(
-          AppMasterInActive, id, info.appName, appMasterPath, 
workerPath.orNull,
-          info.submissionTime, info.startTime, info.finishTime, info.user)
-      })
-
-      sender ! AppMastersData(appMastersData.toList)
-    case QueryAppMasterConfig(appId) =>
-      val config =
-        if (appMasterRegistry.contains(appId)) {
-          val (_, info) = appMasterRegistry(appId)
-          info.config
-        } else if (deadAppMasters.contains(appId)) {
-          val (_, info) = deadAppMasters(appId)
-          info.config
-        } else {
-          null
-        }
-      sender ! AppMasterConfig(config)
-
-    case appMasterDataRequest: AppMasterDataRequest =>
-      val appId = appMasterDataRequest.appId
-      val (appStatus, appMaster, info) =
-        if (appMasterRegistry.contains(appId)) {
-          val (appMaster, info) = appMasterRegistry(appId)
-          (AppMasterActive, appMaster, info)
-        } else if (deadAppMasters.contains(appId)) {
-          val (appMaster, info) = deadAppMasters(appId)
-          (AppMasterInActive, appMaster, info)
-        } else {
-          (AppMasterNonExist, null, null)
-        }
-
-      appStatus match {
-        case AppMasterActive | AppMasterInActive =>
-          val appMasterPath = ActorUtil.getFullPath(context.system, 
appMaster.path)
-          val workerPath = Option(info.worker).map(
-            worker => ActorUtil.getFullPath(context.system, 
worker.path)).orNull
-          sender ! AppMasterData(
-            appStatus, appId, info.appName, appMasterPath, workerPath,
-            info.submissionTime, info.startTime, info.finishTime, info.user)
-
-        case AppMasterNonExist =>
-          sender ! AppMasterData(AppMasterNonExist)
-      }
-  }
-
-  def workerMessage: Receive = {
-    case ShutdownExecutorSucceed(appId, executorId) =>
-      LOG.info(s"Shut down executor $executorId for application $appId 
successfully")
-    case failed: ShutdownExecutorFailed =>
-      LOG.error(failed.reason)
-  }
-
-  private def shutDownExecutorTimeOut(): Unit = {
-    LOG.error(s"Shut down executor time out")
-  }
-
-  def appMasterMessage: Receive = {
-    case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) =>
-      val startTime = System.currentTimeMillis()
-      val register = registerBack.copy(startTime = startTime)
-
-      LOG.info(s"Register AppMaster for app: ${register.appId} $register")
-      context.watch(appMaster)
-      appMasterRegistry += register.appId -> (appMaster, register)
-      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
-        MasterState(appId, appMasterRegistry, deadAppMasters))
-      sender ! AppMasterRegistered(register.appId)
-  }
-
-  def appDataStoreService: Receive = {
-    case SaveAppData(appId, key, value) =>
-      val client = sender
-      (kvService ? PutKV(appId.toString, key, 
value)).asInstanceOf[Future[PutKVResult]].map {
-        case PutKVSuccess =>
-          client ! AppDataSaved
-        case PutKVFailed(k, ex) =>
-          client ! SaveAppDataFailed
-      }
-    case GetAppData(appId, key) =>
-      val client = sender
-      (kvService ? GetKV(appId.toString, 
key)).asInstanceOf[Future[GetKVResult]].map {
-        case GetKVSuccess(privateKey, value) =>
-          client ! GetAppDataResult(key, value)
-        case GetKVFailed(ex) =>
-          client ! GetAppDataResult(key, null)
-      }
-  }
-
-  def terminationWatch: Receive = {
-    case terminate: Terminated =>
-      terminate.getAddressTerminated()
-      LOG.info(s"AppMaster(${terminate.actor.path}) is terminiated, " +
-        s"network down: ${terminate.getAddressTerminated()}")
-
-      // Now we assume that the only normal way to stop the application is 
submitting a
-      // ShutdownApplication request
-      val application = appMasterRegistry.find { appInfo =>
-        val (_, (actorRef, _)) = appInfo
-        actorRef.compareTo(terminate.actor) == 0
-      }
-      if (application.nonEmpty) {
-        val appId = application.get._1
-        (kvService ? GetKV(appId.toString, 
APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
-          case GetKVSuccess(_, result) =>
-            val appState = result.asInstanceOf[ApplicationState]
-            if (appState != null) {
-              LOG.info(s"Recovering application, $appId")
-              self ! RecoverApplication(appState)
-            } else {
-              LOG.error(s"Cannot find application state for $appId")
-            }
-          case GetKVFailed(ex) =>
-            LOG.error(s"Cannot find master state to recover")
-        }
-      }
-  }
-
-  def selfMsgHandler: Receive = {
-    case RecoverApplication(state) =>
-      val appId = state.appId
-      if (appMasterRestartPolicies.get(appId).get.allowRestart) {
-        LOG.info(s"AppManager Recovering Application $appId...")
-        context.actorOf(launcher.props(appId, executorId, state.app, 
state.jar, state.username,
-          context.parent, None), s"launcher${appId}_${Util.randInt()}")
-      } else {
-        LOG.error(s"Application $appId failed too many times")
-      }
-  }
-
-  case class RecoverApplication(applicationStatus: ApplicationState)
-
-  private def cleanApplicationData(appId: Int): Unit = {
-    // Add the dead app to dead appMaster
-    appMasterRegistry.get(appId).foreach { pair =>
-      val (appMasterActor, info) = pair
-      deadAppMasters += appId -> (appMasterActor, info.copy(
-        finishTime = System.currentTimeMillis()))
-    }
-
-    appMasterRegistry -= appId
-
-    kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
-      MasterState(this.appId, appMasterRegistry, deadAppMasters))
-    kvService ! DeleteKVGroup(appId.toString)
-  }
-
-  private def applicationNameExist(appName: String): Boolean = {
-    appMasterRegistry.values.exists(_._2.appName == appName)
-  }
-}
-
-object AppManager {
-  final val APP_STATE = "app_state"
-  // The id is used in KVStore
-  final val MASTER_STATE = "master_state"
-
-  case class MasterState(
-      maxId: Int,
-      appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)],
-      deadAppMasters: Map[Int, (ActorRef, AppMasterRuntimeInfo)])
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala 
b/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
deleted file mode 100644
index 616d3ee..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.TimeoutException
-import scala.concurrent.duration.Duration
-
-import akka.actor._
-import akka.cluster.Cluster
-import akka.cluster.ddata.Replicator._
-import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey}
-import org.slf4j.Logger
-
-import io.gearpump.util.LogUtil
-
-/**
- * A replicated simple in-memory KV service. The replications are stored on 
all masters.
- */
-class InMemoryKVService extends Actor with Stash {
-  import io.gearpump.cluster.master.InMemoryKVService._
-
-  private val KV_SERVICE = "gearpump_kvservice"
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  private val replicator = DistributedData(context.system).replicator
-  private implicit val cluster = Cluster(context.system)
-
-  // Optimize write path, we can tolerate one master down for recovery.
-  private val timeout = Duration(15, TimeUnit.SECONDS)
-  private val readMajority = ReadMajority(timeout)
-  private val writeMajority = WriteMajority(timeout)
-
-  private def groupKey(group: String): LWWMapKey[Any] = {
-    LWWMapKey[Any](KV_SERVICE + "_" + group)
-  }
-
-  def receive: Receive = kvService
-
-  def kvService: Receive = {
-
-    case GetKV(group: String, key: String) =>
-      val request = Request(sender(), key)
-      replicator ! Get(groupKey(group), readMajority, Some(request))
-    case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: 
Request)) =>
-      val appData = success.get(group)
-      LOG.info(s"Successfully retrived group: ${group.id}")
-      request.client ! GetKVSuccess(request.key, 
appData.get(request.key).orNull)
-    case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
-      LOG.info(s"We cannot find group $group")
-      request.client ! GetKVSuccess(request.key, null)
-    case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) 
=>
-      val error = s"Failed to get application data, the request key is 
${request.key}"
-      LOG.error(error)
-      request.client ! GetKVFailed(new Exception(error))
-
-    case PutKV(group: String, key: String, value: Any) =>
-      val request = Request(sender(), key)
-      val update = Update(groupKey(group), LWWMap(), writeMajority, 
Some(request)) { map =>
-        map + (key -> value)
-      }
-      replicator ! update
-    case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: 
Request)) =>
-      request.client ! PutKVSuccess
-    case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, 
Some(request: Request)) =>
-      request.client ! PutKVFailed(request.key, new Exception(error, cause))
-    case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: 
Request)) =>
-      request.client ! PutKVFailed(request.key, new TimeoutException())
-
-    case delete@DeleteKVGroup(group: String) =>
-      replicator ! Delete(groupKey(group), writeMajority)
-    case DeleteSuccess(group) =>
-      LOG.info(s"KV Group ${group.id} is deleted")
-    case ReplicationDeleteFailure(group) =>
-      LOG.error(s"Failed to delete KV Group ${group.id}...")
-    case DataDeleted(group) =>
-      LOG.error(s"Group ${group.id} is deleted, you can no longer 
put/get/delete this group...")
-  }
-}
-
-object InMemoryKVService {
-  /**
-   * KV Service related
-   */
-  case class GetKV(group: String, key: String)
-
-  trait GetKVResult
-
-  case class GetKVSuccess(key: String, value: Any) extends GetKVResult
-
-  case class GetKVFailed(ex: Throwable) extends GetKVResult
-
-  case class PutKV(group: String, key: String, value: Any)
-
-  case class DeleteKVGroup(group: String)
-
-  case class GroupDeleted(group: String) extends GetKVResult with PutKVResult
-
-  trait PutKVResult
-
-  case object PutKVSuccess extends PutKVResult
-
-  case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult
-
-  case class Request(client: ActorRef, key: String)
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala 
b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
deleted file mode 100644
index 0dfa381..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import java.lang.management.ManagementFactory
-import scala.collection.JavaConverters._
-import scala.collection.immutable
-
-import akka.actor._
-import akka.remote.DisassociatedEvent
-import com.typesafe.config.Config
-import org.apache.commons.lang.exception.ExceptionUtils
-import org.slf4j.Logger
-
-import io.gearpump.cluster.AppMasterToMaster._
-import io.gearpump.cluster.ClientToMaster._
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.MasterToAppMaster._
-import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, 
MasterConfig, ResolveWorkerIdResult}
-import io.gearpump.cluster.MasterToWorker._
-import io.gearpump.cluster.WorkerToMaster._
-import io.gearpump.cluster.master.InMemoryKVService._
-import io.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _}
-import io.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.jarstore.local.LocalJarStore
-import io.gearpump.metrics.Metrics.ReportMetrics
-import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
-import io.gearpump.transport.HostPort
-import io.gearpump.util.Constants._
-import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-import io.gearpump.util._
-
-/**
- * Master Actor who manages resources of the whole cluster.
- * It is like the resource manager of YARN.
- */
-private[cluster] class Master extends Actor with Stash {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  private val systemConfig: Config = context.system.settings.config
-  private implicit val timeout = Constants.FUTURE_TIMEOUT
-  private val kvService = context.actorOf(Props(new InMemoryKVService()), 
"kvService")
-  // Resources and resourceRequests can be dynamically constructed by
-  // heartbeat of worker and appmaster when master singleton is migrated.
-  // We don't need to persist them in cluster
-  private var appManager: ActorRef = null
-
-  private var scheduler: ActorRef = null
-
-  private var workers = new immutable.HashMap[ActorRef, WorkerId]
-
-  private val birth = System.currentTimeMillis()
-
-  private var nextWorkerId = 0
-
-  def receive: Receive = null
-
-  // Register jvm metrics
-  Metrics(context.system).register(new JvmMetricsSet(s"master"))
-
-  LOG.info("master is started at " + ActorUtil.getFullPath(context.system, 
self.path) + "...")
-
-  val jarStoreRootPath = 
systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
-
-  private val jarStore = if (Util.isLocalPath(jarStoreRootPath)) {
-    Some(context.actorOf(Props(classOf[LocalJarStore], jarStoreRootPath)))
-  } else {
-    None
-  }
-
-  private val hostPort = 
HostPort(ActorUtil.getSystemAddress(context.system).hostPort)
-
-  // Maintain the list of active masters.
-  private var masters: List[MasterNode] = {
-    // Add myself into the list of initial masters.
-    List(MasterNode(hostPort.host, hostPort.port))
-  }
-
-  val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
-
-  val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
-  val historyMetricsService = if (metricsEnabled) {
-    val historyMetricsService = {
-      context.actorOf(Props(new HistoryMetricsService("master", 
getHistoryMetricsConfig)))
-    }
-
-    val metricsReportService = context.actorOf(
-      Props(new MetricsReporterService(Metrics(context.system))))
-    historyMetricsService.tell(ReportMetrics, metricsReportService)
-    Some(historyMetricsService)
-  } else {
-    None
-  }
-
-  kvService ! GetKV(MASTER_GROUP, WORKER_ID)
-  context.become(waitForNextWorkerId)
-
-  def waitForNextWorkerId: Receive = {
-    case GetKVSuccess(_, result) =>
-      if (result != null) {
-        this.nextWorkerId = result.asInstanceOf[Int]
-      } else {
-        LOG.warn("Cannot find existing state in the distributed cluster...")
-      }
-      context.become(receiveHandler)
-      unstashAll()
-    case GetKVFailed(ex) =>
-      LOG.error("Failed to get worker id, shutting down master to avoid data 
corruption...")
-      context.parent ! PoisonPill
-    case msg =>
-      LOG.info(s"Get message ${msg.getClass.getSimpleName}")
-      stash()
-  }
-
-  def receiveHandler: Receive = workerMsgHandler orElse
-    appMasterMsgHandler orElse
-    onMasterListChange orElse
-    clientMsgHandler orElse
-    metricsService orElse
-    jarStoreService orElse
-    terminationWatch orElse
-    disassociated orElse
-    kvServiceMsgHandler orElse
-    ActorUtil.defaultMsgHandler(self)
-
-  def workerMsgHandler: Receive = {
-    case RegisterNewWorker =>
-      val workerId = WorkerId(nextWorkerId, System.currentTimeMillis())
-      nextWorkerId += 1
-      kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId)
-      val workerHostname = ActorUtil.getHostname(sender())
-      LOG.info(s"Register new from $workerHostname ....")
-      self forward RegisterWorker(workerId)
-
-    case RegisterWorker(id) =>
-      context.watch(sender())
-      sender ! WorkerRegistered(id, MasterInfo(self, birth))
-      scheduler forward WorkerRegistered(id, MasterInfo(self, birth))
-      workers += (sender() -> id)
-      val workerHostname = ActorUtil.getHostname(sender())
-      LOG.info(s"Register Worker with id $id from $workerHostname ....")
-    case resourceUpdate: ResourceUpdate =>
-      scheduler forward resourceUpdate
-  }
-
-  def jarStoreService: Receive = {
-    case GetJarStoreServer =>
-      jarStore.foreach(_ forward GetJarStoreServer)
-  }
-
-  def kvServiceMsgHandler: Receive = {
-    case PutKVSuccess =>
-    // Skip
-    case PutKVFailed(key, exception) =>
-      LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" +
-        ExceptionUtils.getStackTrace(exception))
-  }
-
-  def metricsService: Receive = {
-    case query: QueryHistoryMetrics =>
-      if (historyMetricsService.isEmpty) {
-        // Returns empty metrics so that we don't hang the UI
-        sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
-      } else {
-        historyMetricsService.get forward query
-      }
-  }
-
-  def appMasterMsgHandler: Receive = {
-    case request: RequestResource =>
-      scheduler forward request
-    case registerAppMaster: RegisterAppMaster =>
-      // Forward to appManager
-      appManager forward registerAppMaster
-    case save: SaveAppData =>
-      appManager forward save
-    case get: GetAppData =>
-      appManager forward get
-    case GetAllWorkers =>
-      sender ! WorkerList(workers.values.toList)
-    case GetMasterData =>
-      val aliveFor = System.currentTimeMillis() - birth
-      val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
-      val userDir = System.getProperty("user.dir")
-
-      val masterDescription =
-        MasterSummary(
-          MasterNode(hostPort.host, hostPort.port),
-          masters,
-          aliveFor,
-          logFileDir,
-          jarStoreRootPath,
-          MasterStatus.Synced,
-          userDir,
-          List.empty[MasterActivity],
-          jvmName = ManagementFactory.getRuntimeMXBean().getName(),
-          historyMetricsConfig = getHistoryMetricsConfig
-        )
-
-      sender ! MasterData(masterDescription)
-
-    case invalidAppMaster: InvalidAppMaster =>
-      appManager forward invalidAppMaster
-  }
-
-  import scala.util.{Failure, Success}
-
-  def onMasterListChange: Receive = {
-    case MasterListUpdated(masters: List[MasterNode]) =>
-      this.masters = masters
-  }
-
-  def clientMsgHandler: Receive = {
-    case app: SubmitApplication =>
-      LOG.debug(s"Receive from client, SubmitApplication $app")
-      appManager.forward(app)
-    case app: RestartApplication =>
-      LOG.debug(s"Receive from client, RestartApplication $app")
-      appManager.forward(app)
-    case app: ShutdownApplication =>
-      LOG.debug(s"Receive from client, Shutting down Application ${app.appId}")
-      scheduler ! ApplicationFinished(app.appId)
-      appManager.forward(app)
-    case app: ResolveAppId =>
-      LOG.debug(s"Receive from client, resolving appId ${app.appId} to 
ActorRef")
-      appManager.forward(app)
-    case resolve: ResolveWorkerId =>
-      LOG.debug(s"Receive from client, resolving workerId ${resolve.workerId}")
-      val worker = workers.find(_._2 == resolve.workerId)
-      worker match {
-        case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1))
-        case None => sender ! ResolveWorkerIdResult(Failure(
-          new Exception(s"cannot find worker ${resolve.workerId}")))
-      }
-    case AppMastersDataRequest =>
-      LOG.debug("Master received AppMastersDataRequest")
-      appManager forward AppMastersDataRequest
-    case appMasterDataRequest: AppMasterDataRequest =>
-      LOG.debug("Master received AppMasterDataRequest")
-      appManager forward appMasterDataRequest
-    case query: QueryAppMasterConfig =>
-      LOG.debug("Master received QueryAppMasterConfig")
-      appManager forward query
-    case QueryMasterConfig =>
-      sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
-  }
-
-  def disassociated: Receive = {
-    case disassociated: DisassociatedEvent =>
-      LOG.info(s" disassociated ${disassociated.remoteAddress}")
-  }
-
-  def terminationWatch: Receive = {
-    case t: Terminated =>
-      val actor = t.actor
-      LOG.info(s"worker ${actor.path} get terminated, is it due to network 
reason?" +
-        t.getAddressTerminated())
-
-      LOG.info("Let's filter out dead resources...")
-      // Filters out dead worker resource
-      if (workers.keySet.contains(actor)) {
-        scheduler ! WorkerTerminated(workers.get(actor).get)
-        workers -= actor
-      }
-  }
-
-  override def preStart(): Unit = {
-    val path = ActorUtil.getFullPath(context.system, self.path)
-    LOG.info(s"master path is $path")
-    val schedulerClass = Class.forName(
-      systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER))
-
-    appManager = context.actorOf(Props(new AppManager(kvService, 
AppMasterLauncher)),
-      classOf[AppManager].getSimpleName)
-    scheduler = context.actorOf(Props(schedulerClass))
-    context.system.eventStream.subscribe(self, classOf[DisassociatedEvent])
-  }
-}
-
-object Master {
-  final val MASTER_GROUP = "master_group"
-
-  final val WORKER_ID = "next_worker_id"
-
-  case class WorkerTerminated(workerId: WorkerId)
-
-  case class MasterInfo(master: ActorRef, startTime: Long = 0L)
-
-  /** Notify the subscriber that master actor list has been updated */
-  case class MasterListUpdated(masters: List[MasterNode])
-
-  object MasterInfo {
-    def empty: MasterInfo = MasterInfo(null)
-  }
-
-  case class SlotStatus(totalSlots: Int, availableSlots: Int)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala 
b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
deleted file mode 100644
index 5df008e..0000000
--- 
a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.scheduler
-
-import scala.collection.mutable
-
-import akka.actor.ActorRef
-
-import io.gearpump.cluster.AppMasterToMaster.RequestResource
-import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster.scheduler.Relaxation._
-import io.gearpump.cluster.scheduler.Scheduler.PendingRequest
-import io.gearpump.cluster.worker.WorkerId
-
-/** Assign resource to application based on the priority of the application */
-class PriorityScheduler extends Scheduler {
-  private var resourceRequests = new 
mutable.PriorityQueue[PendingRequest]()(requestOrdering)
-
-  def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] 
{
-    override def compare(x: PendingRequest, y: PendingRequest): Int = {
-      var res = x.request.priority.id - y.request.priority.id
-      if (res == 0) {
-        res = y.timeStamp.compareTo(x.timeStamp)
-      }
-      res
-    }
-  }
-
-  override def receive: Receive = super.handleScheduleMessage orElse 
resourceRequestHandler
-
-  override def allocateResource(): Unit = {
-    var scheduleLater = Array.empty[PendingRequest]
-    val resourcesSnapShot = resources.clone()
-    var allocated = Resource.empty
-    val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum)
-
-    while (resourceRequests.nonEmpty && (allocated < totalResource)) {
-      val PendingRequest(appId, appMaster, request, timeStamp) = 
resourceRequests.dequeue()
-      request.relaxation match {
-        case ANY =>
-          val allocations = allocateFairly(resourcesSnapShot, request)
-          val newAllocated = Resource(allocations.map(_.resource.slots).sum)
-          if (allocations.nonEmpty) {
-            appMaster ! ResourceAllocated(allocations.toArray)
-          }
-          if (newAllocated < request.resource) {
-            val remainingRequest = request.resource - newAllocated
-            val remainingExecutors = request.executorNum - allocations.length
-            val newResourceRequest = request.copy(resource = remainingRequest,
-              executorNum = remainingExecutors)
-            scheduleLater = scheduleLater :+
-              PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
-          }
-          allocated = allocated + newAllocated
-        case ONEWORKER =>
-          val availableResource = resourcesSnapShot.find { params =>
-            val (_, (_, resource)) = params
-            resource > request.resource
-          }
-          if (availableResource.nonEmpty) {
-            val (workerId, (worker, resource)) = availableResource.get
-            allocated = allocated + request.resource
-            appMaster ! 
ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
-              workerId)))
-            resourcesSnapShot.update(workerId, (worker, resource - 
request.resource))
-          } else {
-            scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, 
request, timeStamp)
-          }
-        case SPECIFICWORKER =>
-          val workerAndResource = resourcesSnapShot.get(request.workerId)
-          if (workerAndResource.nonEmpty && workerAndResource.get._2 > 
request.resource) {
-            val (worker, availableResource) = workerAndResource.get
-            appMaster ! 
ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
-              request.workerId)))
-            allocated = allocated + request.resource
-            resourcesSnapShot.update(request.workerId, (worker,
-              availableResource - request.resource))
-          } else {
-            scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, 
request, timeStamp)
-          }
-      }
-    }
-    for (request <- scheduleLater)
-      resourceRequests.enqueue(request)
-  }
-
-  def resourceRequestHandler: Receive = {
-    case RequestResource(appId, request) =>
-      LOG.info(s"Request resource: appId: $appId, slots: 
${request.resource.slots}, " +
-        s"relaxation: ${request.relaxation}, executor number: 
${request.executorNum}")
-      val appMaster = sender()
-      resourceRequests.enqueue(new PendingRequest(appId, appMaster, request,
-        System.currentTimeMillis()))
-      allocateResource()
-  }
-
-  override def doneApplication(appId: Int): Unit = {
-    resourceRequests = resourceRequests.filter(_.appId != appId)
-  }
-
-  private def allocateFairly(
-      resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: 
ResourceRequest)
-    : List[ResourceAllocation] = {
-    val workerNum = resources.size
-    var allocations = List.empty[ResourceAllocation]
-    var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
-    var remainingRequest = request.resource
-    var remainingExecutors = Math.min(request.executorNum, 
request.resource.slots)
-
-    while (remainingExecutors > 0 && !totalAvailable.isEmpty) {
-      val exeutorNum = Math.min(workerNum, remainingExecutors)
-      val toRequest = Resource(remainingRequest.slots * exeutorNum / 
remainingExecutors)
-
-      val sortedResources = 
resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse)
-      val pickedResources = sortedResources.take(exeutorNum)
-
-      val flattenResource = pickedResources.zipWithIndex.flatMap { 
workerWithIndex =>
-        val ((workerId, (worker, resource)), index) = workerWithIndex
-        0.until(resource.slots).map(seq => ((workerId, worker), seq * 
workerNum + index))
-      }.sortBy(_._2).map(_._1)
-
-      if (flattenResource.length < toRequest.slots) {
-        // Can not safisfy the user's requirements
-        totalAvailable = Resource.empty
-      } else {
-        flattenResource.take(toRequest.slots).groupBy(actor => 
actor).mapValues(_.length).
-          toArray.foreach { params =>
-          val ((workerId, worker), slots) = params
-          resources.update(workerId, (worker, resources.get(workerId).get._2 - 
Resource(slots)))
-          allocations :+= ResourceAllocation(Resource(slots), worker, workerId)
-        }
-        totalAvailable -= toRequest
-        remainingRequest -= toRequest
-        remainingExecutors -= exeutorNum
-      }
-    }
-    allocations
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala 
b/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
deleted file mode 100644
index ccd105f..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.scheduler
-
-import scala.collection.mutable
-
-import akka.actor.{Actor, ActorRef}
-import org.slf4j.Logger
-
-import io.gearpump.TimeStamp
-import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, 
UpdateResourceSucceed, WorkerRegistered}
-import io.gearpump.cluster.WorkerToMaster.ResourceUpdate
-import io.gearpump.cluster.master.Master.WorkerTerminated
-import io.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.util.LogUtil
-
-/**
- * Scheduler schedule resource for different applications.
- */
-abstract class Scheduler extends Actor {
-  val LOG: Logger = LogUtil.getLogger(getClass)
-  protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)]
-
-  def handleScheduleMessage: Receive = {
-    case WorkerRegistered(id, _) =>
-      if (!resources.contains(id)) {
-        LOG.info(s"Worker $id added to the scheduler")
-        resources.put(id, (sender, Resource.empty))
-      }
-    case update@ResourceUpdate(worker, workerId, resource) =>
-      LOG.info(s"$update...")
-      if (resources.contains(workerId)) {
-        val resourceReturned = resource > resources.get(workerId).get._2
-        resources.update(workerId, (worker, resource))
-        if (resourceReturned) {
-          allocateResource()
-        }
-        sender ! UpdateResourceSucceed
-      }
-      else {
-        sender ! UpdateResourceFailed(
-          s"ResourceUpdate failed! The worker $workerId has not been 
registered into master")
-      }
-    case WorkerTerminated(workerId) =>
-      if (resources.contains(workerId)) {
-        resources -= workerId
-      }
-    case ApplicationFinished(appId) =>
-      doneApplication(appId)
-  }
-
-  def allocateResource(): Unit
-
-  def doneApplication(appId: Int): Unit
-}
-
-object Scheduler {
-  case class PendingRequest(
-      appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: 
TimeStamp)
-
-  case class ApplicationFinished(appId: Int)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
 
b/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
deleted file mode 100644
index f97a209..0000000
--- 
a/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.worker
-
-import java.io.File
-
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.util.{LogUtil, RichProcess, Util}
-
-/** Launcher to start an executor process */
-class DefaultExecutorProcessLauncher(val config: Config) extends 
ExecutorProcessLauncher {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override def createProcess(
-      appId: Int, executorId: Int, resource: Resource, config: Config, 
options: Array[String],
-      classPath: Array[String], mainClass: String, arguments: Array[String]): 
RichProcess = {
-
-    LOG.info(s"Launch executor, classpath: 
${classPath.mkString(File.pathSeparator)}")
-    Util.startProcess(options, classPath, mainClass, arguments)
-  }
-
-  override def cleanProcess(appId: Int, executorId: Int): Unit = {}
-}

Reply via email to