http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala 
b/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala
new file mode 100644
index 0000000..3a0faad
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.util
+
+import java.io.File
+import scala.concurrent.{ExecutionContext, Future}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.Http.ServerBinding
+import akka.http.scaladsl.marshalling.Marshal
+import akka.http.scaladsl.model.Uri.{Path, Query}
+import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, 
Multipart, _}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server._
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{FileIO, Sink, Source}
+import spray.json.DefaultJsonProtocol._
+import spray.json.JsonFormat
+
+import org.apache.gearpump.jarstore.FilePath
+import org.apache.gearpump.util.FileDirective._
+import org.apache.gearpump.util.FileServer.Port
+
+/**
+ * A simple file server implemented with akka-http to store/fetch large
+ * binary files.
+ */
+class FileServer(system: ActorSystem, host: String, port: Int = 0, 
rootDirectory: File) {
+  import system.dispatcher
+  implicit val actorSystem = system
+  implicit val materializer = ActorMaterializer()
+  implicit def ec: ExecutionContext = system.dispatcher
+
+  val route: Route = {
+    path("upload") {
+      uploadFileTo(rootDirectory) { form =>
+        val fileName = form.fields.headOption.flatMap { pair =>
+          val (_, fileInfo) = pair
+          fileInfo match {
+            case Left(file) => Option(file.file).map(_.getName)
+            case Right(_) => None
+          }
+        }
+
+        if (fileName.isDefined) {
+          complete(fileName.get)
+        } else {
+          failWith(new Exception("File not found in the uploaded form"))
+        }
+      }
+    } ~
+      path("download") {
+        parameters("file") { file: String =>
+          downloadFile(new File(rootDirectory, file))
+        }
+      } ~
+      pathEndOrSingleSlash {
+        extractUri { uri =>
+          val upload = uri.withPath(Uri.Path("/upload")).toString()
+          val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`,
+            s"""
+            |
+            |<h2>Please specify a file to upload:</h2>
+            |<form action="$upload" enctype="multipart/form-data" 
method="post">
+            |<input type="file" name="datafile" size="40">
+            |</p>
+            |<div>
+            |<input type="submit" value="Submit">
+            |</div>
+            |</form>
+          """.stripMargin)
+        complete(entity)
+      }
+    }
+  }
+
+  private var connection: Future[ServerBinding] = null
+
+  def start: Future[Port] = {
+    connection = Http().bindAndHandle(Route.handlerFlow(route), host, port)
+    connection.map(address => Port(address.localAddress.getPort))
+  }
+
+  def stop: Future[Unit] = {
+    connection.flatMap(_.unbind())
+  }
+}
+
+object FileServer {
+
+  implicit def filePathFormat: JsonFormat[FilePath] = 
jsonFormat1(FilePath.apply)
+
+  case class Port(port: Int)
+
+  /**
+   * Client of [[org.apache.gearpump.util.FileServer]]
+   */
+  class Client(system: ActorSystem, host: String, port: Int) {
+
+    def this(system: ActorSystem, url: String) = {
+      this(system, Uri(url).authority.host.address(), Uri(url).authority.port)
+    }
+
+    private implicit val actorSystem = system
+    private implicit val materializer = ActorMaterializer()
+    private implicit val ec = system.dispatcher
+
+    val server = Uri(s"http://$host:$port";)
+    val httpClient = 
Http(system).outgoingConnection(server.authority.host.address(),
+      server.authority.port)
+
+    def upload(file: File): Future[FilePath] = {
+      val target = server.withPath(Path("/upload"))
+
+      val request = entity(file).map { entity =>
+        HttpRequest(HttpMethods.POST, uri = target, entity = entity)
+      }
+
+      val response = 
Source.fromFuture(request).via(httpClient).runWith(Sink.head)
+      response.flatMap { some =>
+        Unmarshal(some).to[String]
+      }.map { path =>
+        FilePath(path)
+      }
+    }
+
+    def download(remoteFile: FilePath, saveAs: File): Future[Unit] = {
+      val download = server.withPath(Path("/download")).withQuery(Query("file" 
-> remoteFile.path))
+      // Download file to local
+      val response = Source.single(HttpRequest(uri = 
download)).via(httpClient).runWith(Sink.head)
+      val downloaded = response.flatMap { response =>
+        response.entity.dataBytes.runWith(FileIO.toFile(saveAs))
+      }
+      downloaded.map(written => Unit)
+    }
+
+    private def entity(file: File)(implicit ec: ExecutionContext): 
Future[RequestEntity] = {
+      val entity = HttpEntity(MediaTypes.`application/octet-stream`, 
file.length(),
+        FileIO.fromFile(file, chunkSize = 100000))
+      val body = Source.single(
+        Multipart.FormData.BodyPart(
+          "uploadfile",
+          entity,
+          Map("filename" -> file.getName)))
+      val form = Multipart.FormData(body)
+
+      Marshal(form).to[RequestEntity]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
 
b/daemon/src/test/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
deleted file mode 100644
index d226af9..0000000
--- 
a/daemon/src/test/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-io.gearpump.jarstore.local.LocalJarStoreService
-io.gearpump.jarstore.dfs.DFSJarStoreService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
 
b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
new file mode 100644
index 0000000..bf37316
--- /dev/null
+++ 
b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.gearpump.jarstore.local.LocalJarStoreService
+org.apache.gearpump.jarstore.dfs.DFSJarStoreService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/MiniCluster.scala 
b/daemon/src/test/scala/io/gearpump/cluster/MiniCluster.scala
deleted file mode 100644
index c6dbbfe..0000000
--- a/daemon/src/test/scala/io/gearpump/cluster/MiniCluster.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-
-import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.pattern.ask
-import akka.testkit.TestActorRef
-import com.typesafe.config.ConfigValueFactory
-
-import io.gearpump.cluster.AppMasterToMaster.GetAllWorkers
-import io.gearpump.cluster.MasterToAppMaster.WorkerList
-import io.gearpump.cluster.master.Master
-import io.gearpump.cluster.worker.Worker
-import io.gearpump.util.Constants
-
-class MiniCluster {
-  private val mockMasterIP = "127.0.0.1"
-
-  implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG.
-    withValue(Constants.NETTY_TCP_HOSTNAME, 
ConfigValueFactory.fromAnyRef(mockMasterIP)))
-
-  val (mockMaster, worker) = {
-    val master = system.actorOf(Props(classOf[Master]), "master")
-    val worker = system.actorOf(Props(classOf[Worker], master), "worker")
-
-    // Wait until worker register itself to master
-    waitUtilWorkerIsRegistered(master)
-    (master, worker)
-  }
-
-  def launchActor(props: Props): TestActorRef[Actor] = {
-    TestActorRef(props)
-  }
-
-  private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = {
-    while (!isWorkerRegistered(master)) {}
-  }
-
-  private def isWorkerRegistered(master: ActorRef): Boolean = {
-    import scala.concurrent.duration._
-    implicit val dispatcher = system.dispatcher
-
-    implicit val futureTimeout = Constants.FUTURE_TIMEOUT
-
-    val workerListFuture = (master ? 
GetAllWorkers).asInstanceOf[Future[WorkerList]]
-
-    // Waits until the worker is registered.
-    val workers = Await.result[WorkerList](workerListFuture, 15.seconds)
-    workers.workers.size > 0
-  }
-
-  def shutDown(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala 
b/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
deleted file mode 100644
index 30347d2..0000000
--- a/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.main
-
-import scala.concurrent.Future
-import scala.util.{Success, Try}
-
-import com.typesafe.config.Config
-import org.scalatest._
-
-import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
-import io.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, 
ReplayFromTimestampWindowTrailingEdge, _}
-import io.gearpump.cluster.MasterToClient.{ReplayApplicationResult, 
ResolveAppIdResult, ShutdownApplicationResult}
-import io.gearpump.cluster.WorkerToMaster.RegisterNewWorker
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.util.Constants._
-import io.gearpump.util.{Constants, LogUtil, Util}
-
-class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with 
MasterHarness {
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  override def config: Config = TestUtil.DEFAULT_CONFIG
-
-  override def beforeEach(): Unit = {
-    startActorSystem()
-  }
-
-  override def afterEach(): Unit = {
-    shutdownActorSystem()
-  }
-
-  "Worker" should "register worker address to master when started." in {
-
-    val masterReceiver = createMockMaster()
-
-    val tempTestConf = convertTestConf(getHost, getPort)
-
-    val options = Array(
-      s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}",
-      s"-D${PREFER_IPV4}=true"
-    ) ++ getMasterListOption()
-
-    val worker = Util.startProcess(options,
-      getContextClassPath,
-      getMainClassName(Worker),
-      Array.empty)
-
-    try {
-      masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker)
-
-      tempTestConf.delete()
-    } finally {
-      worker.destroy()
-    }
-  }
-
-  //  This UT fails a lot on Travis, temporarily delete it.
-  //  "Master" should "accept worker RegisterNewWorker when started" in {
-  //    val worker = TestProbe()(getActorSystem)
-  //
-  //    val port = Util.findFreePort.get
-  //
-  //    val masterConfig =  
Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=127.0.0.1:$port",
-  //      s"-D${Constants.GEARPUMP_HOSTNAME}=127.0.0.1")
-  //
-  //    val masterProcess = Util.startProcess(masterConfig,
-  //      getContextClassPath,
-  //      getMainClassName(io.gearpump.cluster.main.Master),
-  //      Array("-ip", "127.0.0.1", "-port", port.toString))
-  //
-  //    //wait for master process to be started
-  //
-  //    try {
-  //
-  //      val masterProxy = getActorSystem.actorOf(
-  //        MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec")
-  //
-  //      worker.send(masterProxy, RegisterNewWorker)
-  //      worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME)
-  //    } finally {
-  //      masterProcess.destroy()
-  //    }
-  //  }
-
-  "Info" should "be started without exception" in {
-
-    val masterReceiver = createMockMaster()
-
-    Future {
-      io.gearpump.cluster.main.Info.main(masterConfig, Array.empty)
-    }
-
-    masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
-    masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, 
"appName"))))
-  }
-
-  "Kill" should "be started without exception" in {
-
-    val masterReceiver = createMockMaster()
-
-    Future {
-      Kill.main(masterConfig, Array("-appid", "0"))
-    }
-
-    masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0))
-    masterReceiver.reply(ShutdownApplicationResult(Success(0)))
-  }
-
-  "Replay" should "be started without exception" in {
-
-    val masterReceiver = createMockMaster()
-
-    Future {
-      Replay.main(masterConfig, Array("-appid", "0"))
-    }
-
-    masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME)
-    masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref)))
-    
masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME)
-    masterReceiver.reply(ReplayApplicationResult(Success(0)))
-  }
-
-  "Local" should "be started without exception" in {
-    val port = Util.findFreePort().get
-    val options = 
Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port",
-      s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost",
-      s"-D${PREFER_IPV4}=true")
-
-    val local = Util.startProcess(options,
-      getContextClassPath,
-      getMainClassName(Local),
-      Array.empty)
-
-    def retry(times: Int)(fn: => Boolean): Boolean = {
-
-      LOG.info(s"Local Test: Checking whether local port is available, remain 
times $times ..")
-
-      val result = fn
-      if (result || times <= 0) {
-        result
-      } else {
-        Thread.sleep(1000)
-        retry(times - 1)(fn)
-      }
-    }
-
-    try {
-      assert(retry(10)(isPortUsed("127.0.0.1", port)),
-        "local is not started successfully, as port is not used " + port)
-    } finally {
-      local.destroy()
-    }
-  }
-
-  "Gear" should "support app|info|kill|shell|replay" in {
-
-    val commands = Array("app", "info", "kill", "shell", "replay")
-
-    assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw")
-
-    for (command <- commands) {
-      assert(Try(Gear.main(Array("-noexist"))).isFailure,
-        "pass unknown option, throw, command: " + command)
-    }
-
-    assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown 
command, throw ")
-
-    val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist")))
-    assert(tryThis.isFailure, "unknown command, throw")
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala 
b/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
deleted file mode 100644
index 66b9ea8..0000000
--- a/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.main
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.Config
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-
-class MasterWatcherSpec extends FlatSpec with Matchers {
-  def config: Config = TestUtil.MASTER_CONFIG
-
-  "MasterWatcher" should "kill itself when can not get a quorum" in {
-    val system = ActorSystem("ForMasterWatcher", config)
-
-    val actorWatcher = TestProbe()(system)
-
-    val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], 
"watcher"))
-    actorWatcher watch masterWatcher
-    actorWatcher.expectTerminated(masterWatcher, 5.seconds)
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/master/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/scala/io/gearpump/cluster/master/AppManagerSpec.scala 
b/daemon/src/test/scala/io/gearpump/cluster/master/AppManagerSpec.scala
deleted file mode 100644
index ee6e0e2..0000000
--- a/daemon/src/test/scala/io/gearpump/cluster/master/AppManagerSpec.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import scala.util.Success
-
-import akka.actor.{Actor, ActorRef, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.Config
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _}
-import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, 
SubmitApplication}
-import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, 
AppMasterRegistered, AppMastersData, AppMastersDataRequest, _}
-import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, 
ShutdownApplicationResult, SubmitApplicationResult}
-import io.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
-import io.gearpump.cluster.master.AppManager._
-import io.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, 
PutKV, PutKVSuccess}
-import io.gearpump.cluster.{TestUtil, _}
-import io.gearpump.util.LogUtil
-
-class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach 
with MasterHarness {
-  var kvService: TestProbe = null
-  var haService: TestProbe = null
-  var appLauncher: TestProbe = null
-  var appManager: ActorRef = null
-  private val LOG = LogUtil.getLogger(getClass)
-
-  override def config: Config = TestUtil.DEFAULT_CONFIG
-
-  override def beforeEach(): Unit = {
-    startActorSystem()
-    kvService = TestProbe()(getActorSystem)
-    appLauncher = TestProbe()(getActorSystem)
-
-    appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
-      new DummyAppMasterLauncherFactory(appLauncher))))
-    kvService.expectMsgType[GetKV]
-    kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, 
Map.empty)))
-  }
-
-  override def afterEach(): Unit = {
-    shutdownActorSystem()
-  }
-
-  "AppManager" should "handle appmaster message correctly" in {
-    val appMaster = TestProbe()(getActorSystem)
-    val worker = TestProbe()(getActorSystem)
-
-    val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(0, 
"appName"))
-    appMaster.send(appManager, register)
-    appMaster.expectMsgType[AppMasterRegistered]
-  }
-
-  "DataStoreService" should "support Put and Get" in {
-    val appMaster = TestProbe()(getActorSystem)
-    appMaster.send(appManager, SaveAppData(0, "key", 1))
-    kvService.expectMsgType[PutKV]
-    kvService.reply(PutKVSuccess)
-    appMaster.expectMsg(AppDataSaved)
-
-    appMaster.send(appManager, GetAppData(0, "key"))
-    kvService.expectMsgType[GetKV]
-    kvService.reply(GetKVSuccess("key", 1))
-    appMaster.expectMsg(GetAppDataResult("key", 1))
-  }
-
-  "AppManager" should "support application submission and shutdown" in {
-    testClientSubmission(withRecover = false)
-  }
-
-  "AppManager" should "support application submission and recover if appmaster 
dies" in {
-    LOG.info("=================testing recover==============")
-    testClientSubmission(withRecover = true)
-  }
-
-  "AppManager" should "handle client message correctly" in {
-    val mockClient = TestProbe()(getActorSystem)
-    mockClient.send(appManager, ShutdownApplication(1))
-    
assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure)
-
-    mockClient.send(appManager, ResolveAppId(1))
-    
assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure)
-
-    mockClient.send(appManager, AppMasterDataRequest(1))
-    mockClient.expectMsg(AppMasterData(AppMasterNonExist))
-  }
-
-  "AppManager" should "reject the application submission if the app name 
already existed" in {
-    val app = TestUtil.dummyApp
-    val submit = SubmitApplication(app, None, "username")
-    val client = TestProbe()(getActorSystem)
-    val appMaster = TestProbe()(getActorSystem)
-    val worker = TestProbe()(getActorSystem)
-    val appId = 1
-
-    client.send(appManager, submit)
-
-    kvService.expectMsgType[PutKV]
-    appLauncher.expectMsg(LauncherStarted(appId))
-    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
-      AppMasterRuntimeInfo(appId, app.name)))
-    appMaster.expectMsgType[AppMasterRegistered]
-
-    client.send(appManager, submit)
-    
assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
-  }
-
-  def testClientSubmission(withRecover: Boolean): Unit = {
-    val app = TestUtil.dummyApp
-    val submit = SubmitApplication(app, None, "username")
-    val client = TestProbe()(getActorSystem)
-    val appMaster = TestProbe()(getActorSystem)
-    val worker = TestProbe()(getActorSystem)
-    val appId = 1
-
-    client.send(appManager, submit)
-
-    kvService.expectMsgType[PutKV]
-    appLauncher.expectMsg(LauncherStarted(appId))
-    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
-      AppMasterRuntimeInfo(appId, app.name)))
-    kvService.expectMsgType[PutKV]
-    appMaster.expectMsgType[AppMasterRegistered]
-
-    client.send(appManager, ResolveAppId(appId))
-    client.expectMsg(ResolveAppIdResult(Success(appMaster.ref)))
-
-    client.send(appManager, AppMastersDataRequest)
-    client.expectMsgType[AppMastersData]
-
-    client.send(appManager, AppMasterDataRequest(appId, false))
-    client.expectMsgType[AppMasterData]
-
-    if (!withRecover) {
-      client.send(appManager, ShutdownApplication(appId))
-      client.expectMsg(ShutdownApplicationResult(Success(appId)))
-    } else {
-      // Do recovery
-      getActorSystem.stop(appMaster.ref)
-      kvService.expectMsgType[GetKV]
-      val appState = ApplicationState(appId, "application1", 1, app, None, 
"username", null)
-      kvService.reply(GetKVSuccess(APP_STATE, appState))
-      appLauncher.expectMsg(LauncherStarted(appId))
-    }
-  }
-}
-
-class DummyAppMasterLauncherFactory(test: TestProbe) extends 
AppMasterLauncherFactory {
-
-  override def props(appId: Int, executorId: Int, app: AppDescription, jar: 
Option[AppJar],
-      username: String, master: ActorRef, client: Option[ActorRef]): Props = {
-    Props(new DummyAppMasterLauncher(test, appId))
-  }
-}
-
-class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor {
-
-  test.ref ! LauncherStarted(appId)
-  override def receive: Receive = {
-    case any: Any => test.ref forward any
-  }
-}
-
-case class LauncherStarted(appId: Int)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala 
b/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
deleted file mode 100644
index b929349..0000000
--- 
a/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import scala.concurrent.duration._
-
-import akka.actor.Props
-import akka.testkit.TestProbe
-import com.typesafe.config.Config
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.master.InMemoryKVService._
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class InMemoryKVServiceSpec
-  extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
-
-  override def beforeEach(): Unit = {
-    startActorSystem()
-  }
-
-  override def afterEach(): Unit = {
-    shutdownActorSystem()
-  }
-
-  override def config: Config = TestUtil.MASTER_CONFIG
-
-  "KVService" should "get, put, delete correctly" in {
-    val system = getActorSystem
-    val kvService = system.actorOf(Props(new InMemoryKVService()))
-    val group = "group"
-
-    val client = TestProbe()(system)
-
-    client.send(kvService, PutKV(group, "key", 1))
-    client.expectMsg(PutKVSuccess)
-
-    client.send(kvService, PutKV(group, "key", 2))
-    client.expectMsg(PutKVSuccess)
-
-    client.send(kvService, GetKV(group, "key"))
-    client.expectMsg(GetKVSuccess("key", 2))
-
-    client.send(kvService, DeleteKVGroup(group))
-
-    // After DeleteGroup, it no longer accept Get and Put message for this 
group.
-    client.send(kvService, GetKV(group, "key"))
-    client.expectNoMsg(3.seconds)
-
-    client.send(kvService, PutKV(group, "key", 3))
-    client.expectNoMsg(3.seconds)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
 
b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
deleted file mode 100644
index a75ade2..0000000
--- 
a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.scheduler
-
-import scala.concurrent.duration._
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import io.gearpump.cluster.AppMasterToMaster.RequestResource
-import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, 
WorkerRegistered}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.WorkerToMaster.ResourceUpdate
-import io.gearpump.cluster.master.Master.MasterInfo
-import io.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL}
-import io.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-import io.gearpump.cluster.worker.WorkerId
-
-class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) 
with ImplicitSender
-  with WordSpecLike with Matchers with BeforeAndAfterAll{
-
-  def this() = this(ActorSystem("PrioritySchedulerSpec", 
TestUtil.DEFAULT_CONFIG))
-  val appId = 0
-  val workerId1: WorkerId = WorkerId(1, 0L)
-  val workerId2: WorkerId = WorkerId(2, 0L)
-  val mockAppMaster = TestProbe()
-  val mockWorker1 = TestProbe()
-  val mockWorker2 = TestProbe()
-
-  override def afterAll {
-    TestKit.shutdownActorSystem(system)
-  }
-
-  "The scheduler" should {
-    "update resource only when the worker is registered" in {
-      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100))
-      expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker 
$workerId1 has not been " +
-        s"registered into master"))
-    }
-
-    "drop application's resource requests when the application is removed" in {
-      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, 
Relaxation.ANY)
-      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, 
Relaxation.ANY)
-      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
-      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
-      scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
-      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), 
mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, 
Resource(100)), mockWorker1.ref)
-      mockAppMaster.expectNoMsg(5.seconds)
-    }
-  }
-
-  def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean 
= {
-    
left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId))
-  }
-
-  "The resource request with higher priority" should {
-    "be handled first" in {
-      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, 
Relaxation.ANY)
-      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, 
NORMAL, Relaxation.ANY)
-      val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, 
Relaxation.ANY)
-
-      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
-      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
-      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
-      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
-      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), 
mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, 
Resource(100)), mockWorker1.ref)
-
-      var expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-
-      expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-
-      expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-
-      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), 
mockWorker2.ref)
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, 
Resource.empty), mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, 
Resource(100)), mockWorker2.ref)
-
-      expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-    }
-  }
-
-  "The resource request which delivered earlier" should {
-    "be handled first if the priorities are the same" in {
-      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, 
Relaxation.ANY)
-      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, 
Relaxation.ANY)
-      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
-      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
-      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), 
mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, 
Resource(100)), mockWorker1.ref)
-
-      var expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-      expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-    }
-  }
-
-  "The PriorityScheduler" should {
-    "handle the resource request with different relaxation" in {
-      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), workerId2, HIGH, 
Relaxation.SPECIFICWORKER)
-      val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, 
Relaxation.SPECIFICWORKER)
-
-      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
-      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
-      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), 
mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, 
Resource(100)), mockWorker1.ref)
-
-      var expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-
-      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), 
mockWorker2.ref)
-      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, 
Resource(100)), mockWorker2.ref)
-
-      expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-
-      val request3 = ResourceRequest(
-        Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, 
executorNum = 2)
-      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
-
-      expect = ResourceAllocated(Array(
-        ResourceAllocation(Resource(15), mockWorker1.ref, workerId1),
-        ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-
-      // We have to manually update the resource on each worker
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), 
mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), 
mockWorker2.ref)
-      val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, 
Relaxation.ONEWORKER)
-      scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
-
-      expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-    }
-  }
-
-  "The PriorityScheduler" should {
-    "handle the resource request with different executor number" in {
-      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), 
mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, 
Resource(100)), mockWorker1.ref)
-      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), 
mockWorker2.ref)
-      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, 
Resource(100)), mockWorker2.ref)
-
-      // By default, the request requires only one executor
-      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
-      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
-      val allocations2 = 
mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
-      assert(allocations2.allocations.length == 1)
-      assert(allocations2.allocations.head.resource == Resource(20))
-
-      val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, 
executorNum = 3)
-      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
-      val allocations3 = 
mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
-      assert(allocations3.allocations.length == 3)
-      assert(allocations3.allocations.forall(_.resource == Resource(8)))
-
-      // The total available resource can not satisfy the requirements with 
executor number
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), 
mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), 
mockWorker2.ref)
-      val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, 
executorNum = 3)
-      scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
-      val allocations4 = 
mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
-      assert(allocations4.allocations.length == 2)
-      assert(allocations4.allocations.forall(_.resource == Resource(20)))
-
-      // When new resources are available, the remaining request will be 
satisfied
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), 
mockWorker1.ref)
-      val allocations5 = 
mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
-      assert(allocations5.allocations.length == 1)
-      assert(allocations4.allocations.forall(_.resource == Resource(20)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala 
b/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
deleted file mode 100644
index 46e8d37..0000000
--- a/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.worker
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorSystem, PoisonPill, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.{Config, ConfigFactory}
-import org.scalatest._
-
-import io.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, 
LaunchExecutor, ShutdownExecutor}
-import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, 
WorkerRegistered}
-import io.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, 
ShutdownExecutorFailed, ShutdownExecutorSucceed}
-import io.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, 
ResourceUpdate}
-import io.gearpump.cluster.master.Master.MasterInfo
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil}
-import io.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
-
-class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with 
MasterHarness {
-  override def config: Config = TestUtil.DEFAULT_CONFIG
-
-  val appId = 1
-  val workerId: WorkerId = WorkerId(1, 0L)
-  val executorId = 1
-  var masterProxy: TestProbe = null
-  var mockMaster: TestProbe = null
-  var client: TestProbe = null
-  val workerSlots = 50
-
-  override def beforeEach(): Unit = {
-    startActorSystem()
-    mockMaster = TestProbe()(getActorSystem)
-    masterProxy = TestProbe()(getActorSystem)
-    client = TestProbe()(getActorSystem)
-  }
-
-  override def afterEach(): Unit = {
-    shutdownActorSystem()
-  }
-
-  "The new started worker" should {
-    "kill itself if no response from Master after registering" in {
-      val worker = getActorSystem.actorOf(Props(classOf[Worker], 
mockMaster.ref))
-      mockMaster watch worker
-      mockMaster.expectMsg(RegisterNewWorker)
-      mockMaster.expectTerminated(worker, 60.seconds)
-    }
-  }
-
-  "Worker" should {
-    "init its resource from the gearpump config" in {
-      val config = 
ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots").
-        withFallback(TestUtil.DEFAULT_CONFIG)
-      val workerSystem = ActorSystem("WorkerSystem", config)
-      val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
-      mockMaster watch worker
-      mockMaster.expectMsg(RegisterNewWorker)
-
-      worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), 
mockMaster.ref)
-      mockMaster.expectMsg(ResourceUpdate(worker, workerId, 
Resource(workerSlots)))
-
-      worker.tell(
-        UpdateResourceFailed("Test resource update failed", new Exception()), 
mockMaster.ref)
-      mockMaster.expectTerminated(worker, 5.seconds)
-      workerSystem.terminate()
-      Await.result(workerSystem.whenTerminated, Duration.Inf)
-    }
-  }
-
-  "Worker" should {
-    "update its remaining resource when launching and shutting down executors" 
in {
-      val worker = getActorSystem.actorOf(Props(classOf[Worker], 
masterProxy.ref))
-      masterProxy.expectMsg(RegisterNewWorker)
-
-      worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), 
mockMaster.ref)
-      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
-
-      val executorName = ActorUtil.actorNameForExecutor(appId, executorId)
-      // This is an actor path which the ActorSystemBooter will report back to,
-      // not needed in this test
-      val reportBack = "dummy"
-      val executionContext = ExecutorJVMConfig(Array.empty[String],
-        
getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split("
 "),
-        classOf[ActorSystemBooter].getName, Array(executorName, reportBack), 
None,
-        username = "user")
-
-      // Test LaunchExecutor
-      worker.tell(LaunchExecutor(appId, executorId, Resource(101), 
executionContext),
-        mockMaster.ref)
-      mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource 
on this machine"))
-
-      worker.tell(LaunchExecutor(appId, executorId, Resource(5), 
executionContext), mockMaster.ref)
-      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95)))
-
-      worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), 
client.ref)
-      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98)))
-
-      // Test terminationWatch
-      worker.tell(ShutdownExecutor(appId, executorId, "Test shut down 
executor"), client.ref)
-      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
-      client.expectMsg(ShutdownExecutorSucceed(1, 1))
-
-      worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down 
executor"), client.ref)
-      client.expectMsg(ShutdownExecutorFailed(
-        s"Can not find executor ${executorId + 1} for app $appId"))
-
-      mockMaster.ref ! PoisonPill
-      masterProxy.expectMsg(RegisterWorker(workerId))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/util/FileServerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/util/FileServerSpec.scala 
b/daemon/src/test/scala/io/gearpump/util/FileServerSpec.scala
deleted file mode 100644
index 66c7c1d..0000000
--- a/daemon/src/test/scala/io/gearpump/util/FileServerSpec.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.google.common.io.Files
-import io.gearpump.jarstore.FilePath
-import io.gearpump.util.FileServer._
-
-class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll 
{
-
-  implicit val timeout = akka.util.Timeout(25, TimeUnit.SECONDS)
-  val host = "localhost"
-  private val LOG = LogUtil.getLogger(getClass)
-
-  var system: ActorSystem = null
-
-  override def afterAll {
-    if (null != system) {
-      system.terminate()
-      Await.result(system.whenTerminated, Duration.Inf)
-    }
-  }
-
-  override def beforeAll {
-    val config = TestUtil.DEFAULT_CONFIG
-    system = ActorSystem("FileServerSpec", config)
-  }
-
-  private def save(client: Client, data: Array[Byte]): FilePath = {
-    val file = File.createTempFile("fileserverspec", "test")
-    FileUtils.writeByteArrayToFile(file, data)
-    val future = client.upload(file)
-    import scala.concurrent.duration._
-    val path = Await.result(future, 30.seconds)
-    file.delete()
-    path
-  }
-
-  private def get(client: Client, remote: FilePath): Array[Byte] = {
-    val file = File.createTempFile("fileserverspec", "test")
-    val future = client.download(remote, file)
-    import scala.concurrent.duration._
-    val data = Await.result(future, 10.seconds)
-
-    val bytes = FileUtils.readFileToByteArray(file)
-    file.delete()
-    bytes
-  }
-
-  "The file server" should {
-    "serve the data previously stored" in {
-
-      val rootDir = Files.createTempDir()
-
-      val server = new FileServer(system, host, 0, rootDir)
-      val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS))
-
-      LOG.info("start test web server on port " + port)
-
-      val sizes = List(1, 100, 1000000, 50000000)
-      val client = new Client(system, host, port.port)
-
-      sizes.foreach { size =>
-        val bytes = randomBytes(size)
-        val url = s"http://$host:${port.port}/$size";
-        val remote = save(client, bytes)
-        val fetchedBytes = get(client, remote)
-        assert(fetchedBytes sameElements bytes, s"fetch data is coruppted, 
$url, $rootDir")
-      }
-      server.stop
-      rootDir.delete()
-    }
-  }
-
-  "The file server" should {
-    "handle missed file" in {
-
-      val rootDir = Files.createTempDir()
-
-      val server = new FileServer(system, host, 0, rootDir)
-      val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS))
-
-      val client = new Client(system, host, port.port)
-      val fetchedBytes = get(client, FilePath("noexist"))
-      assert(fetchedBytes.length == 0)
-      rootDir.delete()
-    }
-  }
-
-  private def randomBytes(size: Int): Array[Byte] = {
-    val bytes = new Array[Byte](size)
-    (new java.util.Random()).nextBytes(bytes)
-    bytes
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala 
b/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
new file mode 100644
index 0000000..a6b75cb
--- /dev/null
+++ b/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.pattern.ask
+import akka.testkit.TestActorRef
+import com.typesafe.config.ConfigValueFactory
+
+import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers
+import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
+import org.apache.gearpump.cluster.master.Master
+import org.apache.gearpump.cluster.worker.Worker
+import org.apache.gearpump.util.Constants
+
+class MiniCluster {
+  private val mockMasterIP = "127.0.0.1"
+
+  implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG.
+    withValue(Constants.NETTY_TCP_HOSTNAME, 
ConfigValueFactory.fromAnyRef(mockMasterIP)))
+
+  val (mockMaster, worker) = {
+    val master = system.actorOf(Props(classOf[Master]), "master")
+    val worker = system.actorOf(Props(classOf[Worker], master), "worker")
+
+    // Wait until worker register itself to master
+    waitUtilWorkerIsRegistered(master)
+    (master, worker)
+  }
+
+  def launchActor(props: Props): TestActorRef[Actor] = {
+    TestActorRef(props)
+  }
+
+  private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = {
+    while (!isWorkerRegistered(master)) {}
+  }
+
+  private def isWorkerRegistered(master: ActorRef): Boolean = {
+    import scala.concurrent.duration._
+    implicit val dispatcher = system.dispatcher
+
+    implicit val futureTimeout = Constants.FUTURE_TIMEOUT
+
+    val workerListFuture = (master ? 
GetAllWorkers).asInstanceOf[Future[WorkerList]]
+
+    // Waits until the worker is registered.
+    val workers = Await.result[WorkerList](workerListFuture, 15.seconds)
+    workers.workers.size > 0
+  }
+
+  def shutDown(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala 
b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
new file mode 100644
index 0000000..205bb49
--- /dev/null
+++ b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
+import com.typesafe.config.Config
+import org.scalatest._
+
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, 
ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, 
ReplayFromTimestampWindowTrailingEdge, _}
+import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, 
ResolveAppIdResult, ShutdownApplicationResult}
+import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{Constants, LogUtil, Util}
+
+class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with 
MasterHarness {
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "Worker" should "register worker address to master when started." in {
+
+    val masterReceiver = createMockMaster()
+
+    val tempTestConf = convertTestConf(getHost, getPort)
+
+    val options = Array(
+      s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}",
+      s"-D${PREFER_IPV4}=true"
+    ) ++ getMasterListOption()
+
+    val worker = Util.startProcess(options,
+      getContextClassPath,
+      getMainClassName(Worker),
+      Array.empty)
+
+    try {
+      masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker)
+
+      tempTestConf.delete()
+    } finally {
+      worker.destroy()
+    }
+  }
+
+  //  This UT fails a lot on Travis, temporarily delete it.
+  //  "Master" should "accept worker RegisterNewWorker when started" in {
+  //    val worker = TestProbe()(getActorSystem)
+  //
+  //    val port = Util.findFreePort.get
+  //
+  //    val masterConfig =  
Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=127.0.0.1:$port",
+  //      s"-D${Constants.GEARPUMP_HOSTNAME}=127.0.0.1")
+  //
+  //    val masterProcess = Util.startProcess(masterConfig,
+  //      getContextClassPath,
+  //      getMainClassName(org.apache.gearpump.cluster.main.Master),
+  //      Array("-ip", "127.0.0.1", "-port", port.toString))
+  //
+  //    //wait for master process to be started
+  //
+  //    try {
+  //
+  //      val masterProxy = getActorSystem.actorOf(
+  //        MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec")
+  //
+  //      worker.send(masterProxy, RegisterNewWorker)
+  //      worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME)
+  //    } finally {
+  //      masterProcess.destroy()
+  //    }
+  //  }
+
+  "Info" should "be started without exception" in {
+
+    val masterReceiver = createMockMaster()
+
+    Future {
+      org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty)
+    }
+
+    masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
+    masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, 
"appName"))))
+  }
+
+  "Kill" should "be started without exception" in {
+
+    val masterReceiver = createMockMaster()
+
+    Future {
+      Kill.main(masterConfig, Array("-appid", "0"))
+    }
+
+    masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0))
+    masterReceiver.reply(ShutdownApplicationResult(Success(0)))
+  }
+
+  "Replay" should "be started without exception" in {
+
+    val masterReceiver = createMockMaster()
+
+    Future {
+      Replay.main(masterConfig, Array("-appid", "0"))
+    }
+
+    masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME)
+    masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref)))
+    
masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME)
+    masterReceiver.reply(ReplayApplicationResult(Success(0)))
+  }
+
+  "Local" should "be started without exception" in {
+    val port = Util.findFreePort().get
+    val options = 
Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port",
+      s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost",
+      s"-D${PREFER_IPV4}=true")
+
+    val local = Util.startProcess(options,
+      getContextClassPath,
+      getMainClassName(Local),
+      Array.empty)
+
+    def retry(times: Int)(fn: => Boolean): Boolean = {
+
+      LOG.info(s"Local Test: Checking whether local port is available, remain 
times $times ..")
+
+      val result = fn
+      if (result || times <= 0) {
+        result
+      } else {
+        Thread.sleep(1000)
+        retry(times - 1)(fn)
+      }
+    }
+
+    try {
+      assert(retry(10)(isPortUsed("127.0.0.1", port)),
+        "local is not started successfully, as port is not used " + port)
+    } finally {
+      local.destroy()
+    }
+  }
+
+  "Gear" should "support app|info|kill|shell|replay" in {
+
+    val commands = Array("app", "info", "kill", "shell", "replay")
+
+    assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw")
+
+    for (command <- commands) {
+      assert(Try(Gear.main(Array("-noexist"))).isFailure,
+        "pass unknown option, throw, command: " + command)
+    }
+
+    assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown 
command, throw ")
+
+    val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist")))
+    assert(tryThis.isFailure, "unknown command, throw")
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
 
b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
new file mode 100644
index 0000000..e1ba8f6
--- /dev/null
+++ 
b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.TestUtil
+
+class MasterWatcherSpec extends FlatSpec with Matchers {
+  def config: Config = TestUtil.MASTER_CONFIG
+
+  "MasterWatcher" should "kill itself when can not get a quorum" in {
+    val system = ActorSystem("ForMasterWatcher", config)
+
+    val actorWatcher = TestProbe()(system)
+
+    val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], 
"watcher"))
+    actorWatcher watch masterWatcher
+    actorWatcher.expectTerminated(masterWatcher, 5.seconds)
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala 
b/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
new file mode 100644
index 0000000..ae0ebcd
--- /dev/null
+++ 
b/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import scala.util.Success
+
+import akka.actor.{Actor, ActorRef, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _}
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, 
ShutdownApplication, SubmitApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, 
AppMasterRegistered, AppMastersData, AppMastersDataRequest, _}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, 
ShutdownApplicationResult, SubmitApplicationResult}
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, 
ApplicationState}
+import org.apache.gearpump.cluster.master.AppManager._
+import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, 
GetKVSuccess, PutKV, PutKVSuccess}
+import org.apache.gearpump.cluster.{TestUtil, _}
+import org.apache.gearpump.util.LogUtil
+
+class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach 
with MasterHarness {
+  var kvService: TestProbe = null
+  var haService: TestProbe = null
+  var appLauncher: TestProbe = null
+  var appManager: ActorRef = null
+  private val LOG = LogUtil.getLogger(getClass)
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+    kvService = TestProbe()(getActorSystem)
+    appLauncher = TestProbe()(getActorSystem)
+
+    appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
+      new DummyAppMasterLauncherFactory(appLauncher))))
+    kvService.expectMsgType[GetKV]
+    kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, 
Map.empty)))
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "AppManager" should "handle appmaster message correctly" in {
+    val appMaster = TestProbe()(getActorSystem)
+    val worker = TestProbe()(getActorSystem)
+
+    val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(0, 
"appName"))
+    appMaster.send(appManager, register)
+    appMaster.expectMsgType[AppMasterRegistered]
+  }
+
+  "DataStoreService" should "support Put and Get" in {
+    val appMaster = TestProbe()(getActorSystem)
+    appMaster.send(appManager, SaveAppData(0, "key", 1))
+    kvService.expectMsgType[PutKV]
+    kvService.reply(PutKVSuccess)
+    appMaster.expectMsg(AppDataSaved)
+
+    appMaster.send(appManager, GetAppData(0, "key"))
+    kvService.expectMsgType[GetKV]
+    kvService.reply(GetKVSuccess("key", 1))
+    appMaster.expectMsg(GetAppDataResult("key", 1))
+  }
+
+  "AppManager" should "support application submission and shutdown" in {
+    testClientSubmission(withRecover = false)
+  }
+
+  "AppManager" should "support application submission and recover if appmaster 
dies" in {
+    LOG.info("=================testing recover==============")
+    testClientSubmission(withRecover = true)
+  }
+
+  "AppManager" should "handle client message correctly" in {
+    val mockClient = TestProbe()(getActorSystem)
+    mockClient.send(appManager, ShutdownApplication(1))
+    
assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure)
+
+    mockClient.send(appManager, ResolveAppId(1))
+    
assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure)
+
+    mockClient.send(appManager, AppMasterDataRequest(1))
+    mockClient.expectMsg(AppMasterData(AppMasterNonExist))
+  }
+
+  "AppManager" should "reject the application submission if the app name 
already existed" in {
+    val app = TestUtil.dummyApp
+    val submit = SubmitApplication(app, None, "username")
+    val client = TestProbe()(getActorSystem)
+    val appMaster = TestProbe()(getActorSystem)
+    val worker = TestProbe()(getActorSystem)
+    val appId = 1
+
+    client.send(appManager, submit)
+
+    kvService.expectMsgType[PutKV]
+    appLauncher.expectMsg(LauncherStarted(appId))
+    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+      AppMasterRuntimeInfo(appId, app.name)))
+    appMaster.expectMsgType[AppMasterRegistered]
+
+    client.send(appManager, submit)
+    
assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
+  }
+
+  def testClientSubmission(withRecover: Boolean): Unit = {
+    val app = TestUtil.dummyApp
+    val submit = SubmitApplication(app, None, "username")
+    val client = TestProbe()(getActorSystem)
+    val appMaster = TestProbe()(getActorSystem)
+    val worker = TestProbe()(getActorSystem)
+    val appId = 1
+
+    client.send(appManager, submit)
+
+    kvService.expectMsgType[PutKV]
+    appLauncher.expectMsg(LauncherStarted(appId))
+    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+      AppMasterRuntimeInfo(appId, app.name)))
+    kvService.expectMsgType[PutKV]
+    appMaster.expectMsgType[AppMasterRegistered]
+
+    client.send(appManager, ResolveAppId(appId))
+    client.expectMsg(ResolveAppIdResult(Success(appMaster.ref)))
+
+    client.send(appManager, AppMastersDataRequest)
+    client.expectMsgType[AppMastersData]
+
+    client.send(appManager, AppMasterDataRequest(appId, false))
+    client.expectMsgType[AppMasterData]
+
+    if (!withRecover) {
+      client.send(appManager, ShutdownApplication(appId))
+      client.expectMsg(ShutdownApplicationResult(Success(appId)))
+    } else {
+      // Do recovery
+      getActorSystem.stop(appMaster.ref)
+      kvService.expectMsgType[GetKV]
+      val appState = ApplicationState(appId, "application1", 1, app, None, 
"username", null)
+      kvService.reply(GetKVSuccess(APP_STATE, appState))
+      appLauncher.expectMsg(LauncherStarted(appId))
+    }
+  }
+}
+
+class DummyAppMasterLauncherFactory(test: TestProbe) extends 
AppMasterLauncherFactory {
+
+  override def props(appId: Int, executorId: Int, app: AppDescription, jar: 
Option[AppJar],
+      username: String, master: ActorRef, client: Option[ActorRef]): Props = {
+    Props(new DummyAppMasterLauncher(test, appId))
+  }
+}
+
+class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor {
+
+  test.ref ! LauncherStarted(appId)
+  override def receive: Receive = {
+    case any: Any => test.ref forward any
+  }
+}
+
+case class LauncherStarted(appId: Int)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala
 
b/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala
new file mode 100644
index 0000000..325a484
--- /dev/null
+++ 
b/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import scala.concurrent.duration._
+
+import akka.actor.Props
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.master.InMemoryKVService._
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+
+class InMemoryKVServiceSpec
+  extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  override def config: Config = TestUtil.MASTER_CONFIG
+
+  "KVService" should "get, put, delete correctly" in {
+    val system = getActorSystem
+    val kvService = system.actorOf(Props(new InMemoryKVService()))
+    val group = "group"
+
+    val client = TestProbe()(system)
+
+    client.send(kvService, PutKV(group, "key", 1))
+    client.expectMsg(PutKVSuccess)
+
+    client.send(kvService, PutKV(group, "key", 2))
+    client.expectMsg(PutKVSuccess)
+
+    client.send(kvService, GetKV(group, "key"))
+    client.expectMsg(GetKVSuccess("key", 2))
+
+    client.send(kvService, DeleteKVGroup(group))
+
+    // After DeleteGroup, it no longer accept Get and Put message for this 
group.
+    client.send(kvService, GetKV(group, "key"))
+    client.expectNoMsg(3.seconds)
+
+    client.send(kvService, PutKV(group, "key", 3))
+    client.expectNoMsg(3.seconds)
+  }
+}

Reply via email to