http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
 
b/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
new file mode 100644
index 0000000..e82dff3
--- /dev/null
+++ 
b/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.scheduler
+
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.concurrent.duration._
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, 
WorkerRegistered}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL}
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+
+class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) 
with ImplicitSender
+  with WordSpecLike with Matchers with BeforeAndAfterAll{
+
+  def this() = this(ActorSystem("PrioritySchedulerSpec", 
TestUtil.DEFAULT_CONFIG))
+  val appId = 0
+  val workerId1: WorkerId = WorkerId(1, 0L)
+  val workerId2: WorkerId = WorkerId(2, 0L)
+  val mockAppMaster = TestProbe()
+  val mockWorker1 = TestProbe()
+  val mockWorker2 = TestProbe()
+
+  override def afterAll {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  "The scheduler" should {
+    "update resource only when the worker is registered" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100))
+      expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker 
$workerId1 has not been " +
+        s"registered into master"))
+    }
+
+    "drop application's resource requests when the application is removed" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, 
Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, 
Relaxation.ANY)
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), 
mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, 
Resource(100)), mockWorker1.ref)
+      mockAppMaster.expectNoMsg(5.seconds)
+    }
+  }
+
+  def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean 
= {
+    
left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId))
+  }
+
+  "The resource request with higher priority" should {
+    "be handled first" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, 
Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, 
NORMAL, Relaxation.ANY)
+      val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, 
Relaxation.ANY)
+
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), 
mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, 
Resource(100)), mockWorker1.ref)
+
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), 
mockWorker2.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, 
Resource.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, 
Resource(100)), mockWorker2.ref)
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+    }
+  }
+
+  "The resource request which delivered earlier" should {
+    "be handled first if the priorities are the same" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, 
Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, 
Relaxation.ANY)
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), 
mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, 
Resource(100)), mockWorker1.ref)
+
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+    }
+  }
+
+  "The PriorityScheduler" should {
+    "handle the resource request with different relaxation" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), workerId2, HIGH, 
Relaxation.SPECIFICWORKER)
+      val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, 
Relaxation.SPECIFICWORKER)
+
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), 
mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, 
Resource(100)), mockWorker1.ref)
+
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), 
mockWorker2.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, 
Resource(100)), mockWorker2.ref)
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      val request3 = ResourceRequest(
+        Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, 
executorNum = 2)
+      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+
+      expect = ResourceAllocated(Array(
+        ResourceAllocation(Resource(15), mockWorker1.ref, workerId1),
+        ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      // We have to manually update the resource on each worker
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), 
mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), 
mockWorker2.ref)
+      val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, 
Relaxation.ONEWORKER)
+      scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+    }
+  }
+
+  "The PriorityScheduler" should {
+    "handle the resource request with different executor number" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), 
mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, 
Resource(100)), mockWorker1.ref)
+      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), 
mockWorker2.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, 
Resource(100)), mockWorker2.ref)
+
+      // By default, the request requires only one executor
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      val allocations2 = 
mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations2.allocations.length == 1)
+      assert(allocations2.allocations.head.resource == Resource(20))
+
+      val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, 
executorNum = 3)
+      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+      val allocations3 = 
mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations3.allocations.length == 3)
+      assert(allocations3.allocations.forall(_.resource == Resource(8)))
+
+      // The total available resource can not satisfy the requirements with 
executor number
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), 
mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), 
mockWorker2.ref)
+      val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, 
executorNum = 3)
+      scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+      val allocations4 = 
mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations4.allocations.length == 2)
+      assert(allocations4.allocations.forall(_.resource == Resource(20)))
+
+      // When new resources are available, the remaining request will be 
satisfied
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), 
mockWorker1.ref)
+      val allocations5 = 
mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations5.allocations.length == 1)
+      assert(allocations4.allocations.forall(_.resource == Resource(20)))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala 
b/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
new file mode 100644
index 0000000..bf25057
--- /dev/null
+++ b/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.{ActorSystem, PoisonPill, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.scalatest._
+
+import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, 
LaunchExecutor, ShutdownExecutor}
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, 
WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, 
ShutdownExecutorFailed, ShutdownExecutorSucceed}
+import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, 
RegisterWorker, ResourceUpdate}
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil}
+import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
+
+class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with 
MasterHarness {
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  val appId = 1
+  val workerId: WorkerId = WorkerId(1, 0L)
+  val executorId = 1
+  var masterProxy: TestProbe = null
+  var mockMaster: TestProbe = null
+  var client: TestProbe = null
+  val workerSlots = 50
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+    mockMaster = TestProbe()(getActorSystem)
+    masterProxy = TestProbe()(getActorSystem)
+    client = TestProbe()(getActorSystem)
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "The new started worker" should {
+    "kill itself if no response from Master after registering" in {
+      val worker = getActorSystem.actorOf(Props(classOf[Worker], 
mockMaster.ref))
+      mockMaster watch worker
+      mockMaster.expectMsg(RegisterNewWorker)
+      mockMaster.expectTerminated(worker, 60.seconds)
+    }
+  }
+
+  "Worker" should {
+    "init its resource from the gearpump config" in {
+      val config = 
ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots").
+        withFallback(TestUtil.DEFAULT_CONFIG)
+      val workerSystem = ActorSystem("WorkerSystem", config)
+      val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
+      mockMaster watch worker
+      mockMaster.expectMsg(RegisterNewWorker)
+
+      worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), 
mockMaster.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, 
Resource(workerSlots)))
+
+      worker.tell(
+        UpdateResourceFailed("Test resource update failed", new Exception()), 
mockMaster.ref)
+      mockMaster.expectTerminated(worker, 5.seconds)
+      workerSystem.terminate()
+      Await.result(workerSystem.whenTerminated, Duration.Inf)
+    }
+  }
+
+  "Worker" should {
+    "update its remaining resource when launching and shutting down executors" 
in {
+      val worker = getActorSystem.actorOf(Props(classOf[Worker], 
masterProxy.ref))
+      masterProxy.expectMsg(RegisterNewWorker)
+
+      worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), 
mockMaster.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+
+      val executorName = ActorUtil.actorNameForExecutor(appId, executorId)
+      // This is an actor path which the ActorSystemBooter will report back to,
+      // not needed in this test
+      val reportBack = "dummy"
+      val executionContext = ExecutorJVMConfig(Array.empty[String],
+        
getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split("
 "),
+        classOf[ActorSystemBooter].getName, Array(executorName, reportBack), 
None,
+        username = "user")
+
+      // Test LaunchExecutor
+      worker.tell(LaunchExecutor(appId, executorId, Resource(101), 
executionContext),
+        mockMaster.ref)
+      mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource 
on this machine"))
+
+      worker.tell(LaunchExecutor(appId, executorId, Resource(5), 
executionContext), mockMaster.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95)))
+
+      worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), 
client.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98)))
+
+      // Test terminationWatch
+      worker.tell(ShutdownExecutor(appId, executorId, "Test shut down 
executor"), client.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+      client.expectMsg(ShutdownExecutorSucceed(1, 1))
+
+      worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down 
executor"), client.ref)
+      client.expectMsg(ShutdownExecutorFailed(
+        s"Can not find executor ${executorId + 1} for app $appId"))
+
+      mockMaster.ref ! PoisonPill
+      masterProxy.expectMsg(RegisterWorker(workerId))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala 
b/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala
new file mode 100644
index 0000000..b33c801
--- /dev/null
+++ b/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.ActorSystem
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import org.apache.gearpump.cluster.TestUtil
+import io.gearpump.google.common.io.Files
+import org.apache.gearpump.jarstore.FilePath
+import org.apache.gearpump.util.FileServer._
+
+class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll 
{
+
+  implicit val timeout = akka.util.Timeout(25, TimeUnit.SECONDS)
+  val host = "localhost"
+  private val LOG = LogUtil.getLogger(getClass)
+
+  var system: ActorSystem = null
+
+  override def afterAll {
+    if (null != system) {
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
+    }
+  }
+
+  override def beforeAll {
+    val config = TestUtil.DEFAULT_CONFIG
+    system = ActorSystem("FileServerSpec", config)
+  }
+
+  private def save(client: Client, data: Array[Byte]): FilePath = {
+    val file = File.createTempFile("fileserverspec", "test")
+    FileUtils.writeByteArrayToFile(file, data)
+    val future = client.upload(file)
+    import scala.concurrent.duration._
+    val path = Await.result(future, 30.seconds)
+    file.delete()
+    path
+  }
+
+  private def get(client: Client, remote: FilePath): Array[Byte] = {
+    val file = File.createTempFile("fileserverspec", "test")
+    val future = client.download(remote, file)
+    import scala.concurrent.duration._
+    val data = Await.result(future, 10.seconds)
+
+    val bytes = FileUtils.readFileToByteArray(file)
+    file.delete()
+    bytes
+  }
+
+  "The file server" should {
+    "serve the data previously stored" in {
+
+      val rootDir = Files.createTempDir()
+
+      val server = new FileServer(system, host, 0, rootDir)
+      val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS))
+
+      LOG.info("start test web server on port " + port)
+
+      val sizes = List(1, 100, 1000000, 50000000)
+      val client = new Client(system, host, port.port)
+
+      sizes.foreach { size =>
+        val bytes = randomBytes(size)
+        val url = s"http://$host:${port.port}/$size";
+        val remote = save(client, bytes)
+        val fetchedBytes = get(client, remote)
+        assert(fetchedBytes sameElements bytes, s"fetch data is coruppted, 
$url, $rootDir")
+      }
+      server.stop
+      rootDir.delete()
+    }
+  }
+
+  "The file server" should {
+    "handle missed file" in {
+
+      val rootDir = Files.createTempDir()
+
+      val server = new FileServer(system, host, 0, rootDir)
+      val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS))
+
+      val client = new Client(system, host, port.port)
+      val fetchedBytes = get(client, FilePath("noexist"))
+      assert(fetchedBytes.length == 0)
+      rootDir.delete()
+    }
+  }
+
+  private def randomBytes(size: Int): Array[Byte] = {
+    val bytes = new Array[Byte](size)
+    (new java.util.Random()).nextBytes(bytes)
+    bytes
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/deployment-configuration.md
----------------------------------------------------------------------
diff --git a/docs/deployment-configuration.md b/docs/deployment-configuration.md
index 70b5500..b651fed 100644
--- a/docs/deployment-configuration.md
+++ b/docs/deployment-configuration.md
@@ -76,7 +76,7 @@ This is the default configuration for `gear.conf`.
 | gearpump.executor.vmargs | "-server -Xss1M -XX:+HeapDumpOnOutOfMemoryError 
-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseParNewGC 
-XX:NewRatio=3  -Djava.rmi.server.hostname=localhost" | JVM arguments for 
executor |
 | gearpump.executor.extraClasspath | "" | JVM default class path for executor |
 | gearpump.jarstore.rootpath | "jarstore/" |   Define where the submitted jar 
file will be stored. This path follows the hadoop path schema. For HDFS, use 
`hdfs://host:port/path/`; if you want to store on master nodes, then use local 
directory. `jarstore.rootpath = "jarstore/"` will point to relative directory 
where master is started. `jarstore.rootpath = "/jarstore/"` will point to 
absolute directory on master server |
-| gearpump.scheduling.scheduler-class 
|"io.gearpump.cluster.scheduler.PriorityScheduler" | Class to schedule the 
applications. |
+| gearpump.scheduling.scheduler-class 
|"org.apache.gearpump.cluster.scheduler.PriorityScheduler" | Class to schedule 
the applications. |
 | gearpump.services.host | "127.0.0.1" | dashboard UI host address |
 | gearpump.services.port | 8090 | dashboard UI host port |
 | gearpump.netty.buffer-size | 5242880 | netty connection buffer size |
@@ -88,4 +88,4 @@ This is the default configuration for `gear.conf`.
 | gearpump.netty.dispatcher | "gearpump.shared-thread-pool-dispatcher" | 
default dispatcher for netty client and server |
 | gearpump.shared-thread-pool-dispatcher | default Dispatcher with 
"fork-join-executor" | default shared thread pool dispatcher |
 | gearpump.single-thread-dispatcher | PinnedDispatcher | default single thread 
dispatcher |
-| serialization-framework | 
"io.gearpump.serializer.FastKryoSerializationFramework" | Gearpump has built-in 
serialization framework using Kryo. Users are allowed to use a different 
serialization framework, like Protobuf. See 
`io.gearpump.serializer.FastKryoSerializationFramework` to find how a custom 
serialization framework can be defined |
+| serialization-framework | 
"org.apache.gearpump.serializer.FastKryoSerializationFramework" | Gearpump has 
built-in serialization framework using Kryo. Users are allowed to use a 
different serialization framework, like Protobuf. See 
`org.apache.gearpump.serializer.FastKryoSerializationFramework` to find how a 
custom serialization framework can be defined |

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/deployment-resource-isolation.md
----------------------------------------------------------------------
diff --git a/docs/deployment-resource-isolation.md 
b/docs/deployment-resource-isolation.md
index 0cb3764..955b8dd 100644
--- a/docs/deployment-resource-isolation.md
+++ b/docs/deployment-resource-isolation.md
@@ -83,7 +83,7 @@ The following steps are supposed to be executed by root user.
 2. Enter into Gearpump's home folder, edit gear.conf under folder 
```${GEARPUMP_HOME}/conf/```
 
    ```
-   gearpump.worker.executor-process-launcher = 
"io.gearpump.cluster.worker.CGroupProcessLauncher"
+   gearpump.worker.executor-process-launcher = 
"org.apache.gearpump.cluster.worker.CGroupProcessLauncher"
    
    gearpump.cgroup.root = "gearpump"
    ```

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/deployment-ui-authentication.md
----------------------------------------------------------------------
diff --git a/docs/deployment-ui-authentication.md 
b/docs/deployment-ui-authentication.md
index 872a857..614dc15 100644
--- a/docs/deployment-ui-authentication.md
+++ b/docs/deployment-ui-authentication.md
@@ -35,7 +35,7 @@ UI server admin can also choose to enable **auxiliary** 
OAuth2 authentication ch
    Gearpump provides a built-in ConfigFileBasedAuthenticator which verify user 
name and password
    against password hashcode stored in config files.
 
-   However, developer can choose to extends the 
```io.gearpump.security.Authenticator``` to provide a custom
+   However, developer can choose to extends the 
```org.apache.gearpump.security.Authenticator``` to provide a custom
    User-Password based authenticator, to support LDAP, Kerberos, and 
Database-based authentication...
 
 ### ConfigFileBasedAuthenticator: built-in User-Password Authenticator
@@ -59,7 +59,7 @@ Suppose we want to add user jerry as an administrator, here 
are the steps:
    to generate the digest:
    
    ```
-   bin/gear io.gearpump.security.PasswordUtil -password  ilovegearpump
+   bin/gear org.apache.gearpump.security.PasswordUtil -password  ilovegearpump
    ```
    
    It will generate a digest value like this:
@@ -110,8 +110,8 @@ If developer choose to define his/her own User-Password 
based authenticator, it
     modify configuration option:
 
 ```
-## Replace "io.gearpump.security.CustomAuthenticator" with your real 
authenticator class.
-gearpump.ui-security.authenticator = "io.gearpump.security.CustomAuthenticator"
+## Replace "org.apache.gearpump.security.CustomAuthenticator" with your real 
authenticator class.
+gearpump.ui-security.authenticator = 
"org.apache.gearpump.security.CustomAuthenticator"
 ```
 
 Make sure CustomAuthenticator extends interface:
@@ -357,7 +357,7 @@ You can follow the Google OAuth2 example code to define a 
custom OAuth2Authentic
    ```
     ## name of this authenticator
    "socialnetworkx" {
-     "class" = 
"io.gearpump.services.security.oauth2.impl.SocialNetworkXAuthenticator"
+     "class" = 
"org.apache.gearpump.services.security.oauth2.impl.SocialNetworkXAuthenticator"
 
      ## Please make sure this URL matches the name
      "callback" = "http://127.0.0.1:8090/login/oauth2/socialnetworkx/callback";

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/dev-connectors.md
----------------------------------------------------------------------
diff --git a/docs/dev-connectors.md b/docs/dev-connectors.md
index 08ff023..2060c93 100644
--- a/docs/dev-connectors.md
+++ b/docs/dev-connectors.md
@@ -9,7 +9,7 @@ title: Gearpump Connectors
 ### DataSource
 `DataSource` is the concept in Gearpump that without input and will output 
messages. So, basically, `DataSource` is the start point of a streaming 
processing flow.
 
-As Gearpump depends on `DataSource` to be replay-able to ensure at-least-once 
message delivery and exactly-once message delivery, for some data sources, we 
will need a `io.gearpump.streaming.transaction.api.OffsetStorageFactory` to 
store the offset (progress) of current `DataSource`. So that, when a replay is 
needed, Gearpump can guide `DataSource` to replay from certain offset.
+As Gearpump depends on `DataSource` to be replay-able to ensure at-least-once 
message delivery and exactly-once message delivery, for some data sources, we 
will need a 
`org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory` to store 
the offset (progress) of current `DataSource`. So that, when a replay is 
needed, Gearpump can guide `DataSource` to replay from certain offset.
 
 Currently Gearpump `DataSource` only support infinite stream. Finite stream 
support will be added in a near future release.
 
@@ -143,7 +143,7 @@ To implement your own `DataSource`, you need to implement 
two things:
 2. a helper class to easy the usage in a DSL
 
 ### Implement your own `DataSource`
-You need to implement a class derived from 
`io.gearpump.streaming.transaction.api.TimeReplayableSource`.
+You need to implement a class derived from 
`org.apache.gearpump.streaming.transaction.api.TimeReplayableSource`.
 
 ### Implement DSL helper (Optional)
 If you would like to have a DSL at hand you may start with this customized 
stream; it is better if you can implement your own DSL helper.
@@ -174,7 +174,7 @@ To implement your own `DataSink`, you need to implement two 
things:
 2. a helper class to make it easy use in DSL
 
 ### Implement your own `DataSink`
-You need to implement a class derived from 
`io.gearpump.streaming.sink.DataSink`.
+You need to implement a class derived from 
`org.apache.gearpump.streaming.sink.DataSink`.
 
 ### Implement DSL helper (Optional)
 If you would like to have a DSL at hand you may start with this customized 
stream; it is better if you can implement your own DSL helper.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/dev-custom-serializer.md
----------------------------------------------------------------------
diff --git a/docs/dev-custom-serializer.md b/docs/dev-custom-serializer.md
index 5be0544..8e6ba46 100644
--- a/docs/dev-custom-serializer.md
+++ b/docs/dev-custom-serializer.md
@@ -15,10 +15,10 @@ To register a class, you need to change the configuration 
file gear.conf(or appl
 gearpump {
   serializers {
     ## We will use default FieldSerializer to serialize this class type
-    "io.gearpump.UserMessage" = ""
+    "org.apache.gearpump.UserMessage" = ""
     
     ## we will use custom serializer to serialize this class type
-    "io.gearpump.UserMessage2" = "io.gearpump.UserMessageSerializer"
+    "org.apache.gearpump.UserMessage2" = 
"org.apache.gearpump.UserMessageSerializer"
   }
 }
 ```
@@ -27,7 +27,7 @@ gearpump {
 
 When you decide that you want to define a custom serializer, you can do this 
in two ways.
 
-Please note that Gearpump shaded the original Kryo dependency. The package 
name ```com.esotericsoftware``` was relocated to 
```io.gearpump.esotericsoftware```. So in the following customization, you 
should import corresponding shaded classes, the example code will show that 
part.
+Please note that Gearpump shaded the original Kryo dependency. The package 
name ```com.esotericsoftware``` was relocated to 
```org.apache.gearpump.esotericsoftware```. So in the following customization, 
you should import corresponding shaded classes, the example code will show that 
part.
 
 In general you should use the shaded version of a library whenever possible in 
order to avoid binary incompatibilities, eg don't use:
 
@@ -38,7 +38,7 @@ In general you should use the shaded version of a library 
whenever possible in o
 but rather
 
 ```scala
-   import io.gearpump.google.common.io.Files
+   import org.apache.gearpump.google.common.io.Files
 ```
 
 ##### System Level Serializer
@@ -48,10 +48,10 @@ If the serializer is widely used, you can define a global 
serializer which is av
 ###### Step1: you first need to develop a java library which contains the 
custom serializer class. here is an example:
 
 ```scala
-package io.gearpump
+package org.apache.gearpump
 
-import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer}
-import io.gearpump.esotericsoftware.kryo.io.{Input, Output}
+import org.apache.gearpump.esotericsoftware.kryo.{Kryo, Serializer}
+import org.apache.gearpump.esotericsoftware.kryo.io.{Input, Output}
 
 class UserMessage(longField: Long, intField: Int)
 
@@ -78,7 +78,7 @@ Distribute the jar file to lib/ folder of every Gearpump 
installation in the clu
 ```
 gearpump {
   serializers {
-    "io.gearpump.UserMessage" = "io.gearpump.UserMessageSerializer"
+    "org.apache.gearpump.UserMessage" = 
"org.apache.gearpump.UserMessageSerializer"
   }
 }
 ```
@@ -93,10 +93,10 @@ If all you want is to define an application level 
serializer, which is only visi
 You should include the Serializer class in your application jar. Here is an 
example to define a custom serializer:
 
 ```scala
-package io.gearpump
+package org.apache.gearpump
 
-import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer}
-import io.gearpump.esotericsoftware.kryo.io.{Input, Output}
+import org.apache.gearpump.esotericsoftware.kryo.{Kryo, Serializer}
+import org.apache.gearpump.esotericsoftware.kryo.io.{Input, Output}
 
 class UserMessage(longField: Long, intField: Int)
 
@@ -120,7 +120,7 @@ class UserMessageSerializer extends Serializer[UserMessage] 
{
 ### content of application.conf
 gearpump {
   serializers {
-    "io.gearpump.UserMessage" = "io.gearpump.UserMessageSerializer"
+    "org.apache.gearpump.UserMessage" = 
"org.apache.gearpump.UserMessageSerializer"
   }
 }
 ```
@@ -136,7 +136,7 @@ There are other serialization framework besides Kryo, like 
Protobuf. If user don
 basically, user need to define in gear.conf(or application.conf for single 
application's scope) file like this:
 
 ```bash
-gearpump.serialization-framework = 
"io.gearpump.serializer.CustomSerializationFramework"
+gearpump.serialization-framework = 
"org.apache.gearpump.serializer.CustomSerializationFramework"
 ```
 
 Please find an example in gearpump storm module, search 
"StormSerializationFramework" in source code.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/dev-non-streaming-example.md
----------------------------------------------------------------------
diff --git a/docs/dev-non-streaming-example.md 
b/docs/dev-non-streaming-example.md
index 1aec2d1..8b859dc 100644
--- a/docs/dev-non-streaming-example.md
+++ b/docs/dev-non-streaming-example.md
@@ -3,7 +3,7 @@ layout: global
 title: Gearpump Non-Streaming Example
 ---
 
-We'll use [Distributed 
Shell](https://github.com/gearpump/gearpump/tree/master/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell)
 as an example to illustrate how to do that.
+We'll use [Distributed 
Shell](https://github.com/gearpump/gearpump/tree/master/examples/distributedshell/src/main/scala/org.apache.gearpump/examples/distributedshell)
 as an example to illustrate how to do that.
 
 What Distributed Shell do is that user send a shell command to the cluster and 
the command will the executed on each node, then the result will be return to 
user.
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/dev-rest-api.md
----------------------------------------------------------------------
diff --git a/docs/dev-rest-api.md b/docs/dev-rest-api.md
index 6243a89..161a09f 100644
--- a/docs/dev-rest-api.md
+++ b/docs/dev-rest-api.md
@@ -262,36 +262,36 @@ Sample Response:
 :
     [{
         "time": "1450758725070",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"master:memory.heap.used", "value": "59764272"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "master:memory.heap.used", "value": "59764272"}
     }, {
         "time": "1450758725070",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"master:thread.daemon.count", "value": "18"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "master:thread.daemon.count", "value": "18"}
     }, {
         "time": "1450758725070",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "master:memory.total.committed",
             "value": "210239488"
         }
     }, {
         "time": "1450758725070",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"master:memory.heap.max", "value": "880017408"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "master:memory.heap.max", "value": "880017408"}
     }, {
         "time": "1450758725070",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"master:memory.total.max", "value": "997457920"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "master:memory.total.max", "value": "997457920"}
     }, {
         "time": "1450758725070",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "master:memory.heap.committed",
             "value": "179830784"
         }
     }, {
         "time": "1450758725070",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"master:memory.total.used", "value": "89117352"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "master:memory.total.used", "value": "89117352"}
     }, {
         "time": "1450758725070",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"master:thread.count", "value": "28"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "master:thread.count", "value": "28"}
     }]
 }
 ```
@@ -471,88 +471,88 @@ Sample Response:
     [{
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker1:memory.total.used",
             "value": "152931440"
         }
     }, {
         "time": "1450759137860",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"worker1:thread.daemon.count", "value": "18"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "worker1:thread.daemon.count", "value": "18"}
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker0:memory.heap.used",
             "value": "123139640"
         }
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker0:memory.total.max",
             "value": "997457920"
         }
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker0:memory.heap.committed",
             "value": "179830784"
         }
     }, {
         "time": "1450759137860",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"worker0:thread.count", "value": "28"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "worker0:thread.count", "value": "28"}
     }, {
         "time": "1450759137860",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"worker0:memory.heap.max", "value": "880017408"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "worker0:memory.heap.max", "value": "880017408"}
     }, {
         "time": "1450759137860",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"worker1:memory.heap.max", "value": "880017408"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "worker1:memory.heap.max", "value": "880017408"}
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker0:memory.total.committed",
             "value": "210239488"
         }
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker0:memory.total.used",
             "value": "152931440"
         }
     }, {
         "time": "1450759137860",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"worker1:thread.count", "value": "28"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "worker1:thread.count", "value": "28"}
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker1:memory.total.max",
             "value": "997457920"
         }
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker1:memory.heap.committed",
             "value": "179830784"
         }
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker1:memory.total.committed",
             "value": "210239488"
         }
     }, {
         "time": "1450759137860",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"worker0:thread.daemon.count", "value": "18"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "worker0:thread.daemon.count", "value": "18"}
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker1:memory.heap.used",
             "value": "123139640"
         }
@@ -654,7 +654,7 @@ Sample Response:
       0,
       {
         "id": 0,
-        "taskClass": "io.gearpump.streaming.examples.wordcount.Split",
+        "taskClass": "org.apache.gearpump.streaming.examples.wordcount.Split",
         "parallelism": 1,
         "description": "",
         "taskConf": {
@@ -681,7 +681,7 @@ Sample Response:
       1,
       {
         "id": 1,
-        "taskClass": "io.gearpump.streaming.examples.wordcount.Sum",
+        "taskClass": "org.apache.gearpump.streaming.examples.wordcount.Sum",
         "parallelism": 1,
         "description": "",
         "taskConf": {
@@ -723,7 +723,7 @@ Sample Response:
     "edgeList": [
       [
         0,
-        "io.gearpump.partitioner.HashPartitioner",
+        "org.apache.gearpump.partitioner.HashPartitioner",
         1
       ]
     ]
@@ -856,7 +856,7 @@ Sample Response:
         },
         "netty-dispatcher" : "akka.actor.default-dispatcher",
         "scheduling" : {
-            "scheduler-class" : 
"io.gearpump.cluster.scheduler.PriorityScheduler"
+            "scheduler-class" : 
"org.apache.gearpump.cluster.scheduler.PriorityScheduler"
         },
         "serializers" : {
             "[B" : "",
@@ -868,11 +868,11 @@ Sample Response:
             "[Ljava.lang.String;" : "",
             "[S" : "",
             "[Z" : "",
-            "io.gearpump.Message" : "io.gearpump.streaming.MessageSerializer",
-            "io.gearpump.streaming.task.Ack" : 
"io.gearpump.streaming.AckSerializer",
-            "io.gearpump.streaming.task.AckRequest" : 
"io.gearpump.streaming.AckRequestSerializer",
-            "io.gearpump.streaming.task.LatencyProbe" : 
"io.gearpump.streaming.LatencyProbeSerializer",
-            "io.gearpump.streaming.task.TaskId" : 
"io.gearpump.streaming.TaskIdSerializer",
+            "org.apache.gearpump.Message" : 
"org.apache.gearpump.streaming.MessageSerializer",
+            "org.apache.gearpump.streaming.task.Ack" : 
"org.apache.gearpump.streaming.AckSerializer",
+            "org.apache.gearpump.streaming.task.AckRequest" : 
"org.apache.gearpump.streaming.AckRequestSerializer",
+            "org.apache.gearpump.streaming.task.LatencyProbe" : 
"org.apache.gearpump.streaming.LatencyProbeSerializer",
+            "org.apache.gearpump.streaming.task.TaskId" : 
"org.apache.gearpump.streaming.TaskIdSerializer",
             "scala.Tuple1" : "",
             "scala.Tuple2" : "",
             "scala.Tuple3" : "",
@@ -910,7 +910,7 @@ aggregator points to a aggregator class, which will 
aggregate on the current met
 Example:
 
 ```bash
-curl [--cookie outputAuthenticationCookie.txt] 
http://127.0.0.1:8090/api/v1.0/appmaster/1/metrics/app1?readLatest=true&aggregator=io.gearpump.streaming.metrics.ProcessorAggregator
+curl [--cookie outputAuthenticationCookie.txt] 
http://127.0.0.1:8090/api/v1.0/appmaster/1/metrics/app1?readLatest=true&aggregator=org.apache.gearpump.streaming.metrics.ProcessorAggregator
 ```
 
 Sample Response:
@@ -924,88 +924,88 @@ Sample Response:
     [{
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker1:memory.total.used",
             "value": "152931440"
         }
     }, {
         "time": "1450759137860",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"worker1:thread.daemon.count", "value": "18"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "worker1:thread.daemon.count", "value": "18"}
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker0:memory.heap.used",
             "value": "123139640"
         }
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker0:memory.total.max",
             "value": "997457920"
         }
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker0:memory.heap.committed",
             "value": "179830784"
         }
     }, {
         "time": "1450759137860",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"worker0:thread.count", "value": "28"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "worker0:thread.count", "value": "28"}
     }, {
         "time": "1450759137860",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"worker0:memory.heap.max", "value": "880017408"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "worker0:memory.heap.max", "value": "880017408"}
     }, {
         "time": "1450759137860",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"worker1:memory.heap.max", "value": "880017408"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "worker1:memory.heap.max", "value": "880017408"}
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker0:memory.total.committed",
             "value": "210239488"
         }
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker0:memory.total.used",
             "value": "152931440"
         }
     }, {
         "time": "1450759137860",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"worker1:thread.count", "value": "28"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "worker1:thread.count", "value": "28"}
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker1:memory.total.max",
             "value": "997457920"
         }
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker1:memory.heap.committed",
             "value": "179830784"
         }
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker1:memory.total.committed",
             "value": "210239488"
         }
     }, {
         "time": "1450759137860",
-        "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": 
"worker0:thread.daemon.count", "value": "18"}
+        "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", 
"name": "worker0:thread.daemon.count", "value": "18"}
     }, {
         "time": "1450759137860",
         "value": {
-            "$type": "io.gearpump.metrics.Metrics.Gauge",
+            "$type": "org.apache.gearpump.metrics.Metrics.Gauge",
             "name": "worker1:memory.heap.used",
             "value": "123139640"
         }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/dev-write-1st-app.md
----------------------------------------------------------------------
diff --git a/docs/dev-write-1st-app.md b/docs/dev-write-1st-app.md
index e768397..e125f3d 100644
--- a/docs/dev-write-1st-app.md
+++ b/docs/dev-write-1st-app.md
@@ -5,7 +5,7 @@ title: Write Your 1st Gearpump App
 description: Write Your 1st Gearpump App
 ---
 
-We'll use 
[wordcount](https://github.com/gearpump/gearpump/blob/master/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/)
 as an example to illustrate how to write Gearpump applications.
+We'll use 
[wordcount](https://github.com/gearpump/gearpump/blob/master/examples/streaming/wordcount/src/main/scala/org.apache.gearpump/streaming/examples/wordcount/)
 as an example to illustrate how to write Gearpump applications.
 
 ### Maven/Sbt Settings
 
@@ -17,9 +17,9 @@ You can get your preferred IDE ready for Gearpump by 
following [this guide](dev-
 ### Decide which language and API to use for writing 
 Gearpump supports two level APIs:
 
-1. Low level API, which is more similar to Akka programming, operating on each 
event. The API document can be found at [Low Level API 
Doc](http://www.gearpump.io/releases/latest/api/scala/index.html#io.gearpump.streaming.package).
+1. Low level API, which is more similar to Akka programming, operating on each 
event. The API document can be found at [Low Level API 
Doc](http://www.gearpump.io/releases/latest/api/scala/index.html#org.apache.gearpump.streaming.package).
 
-2. High level API (aka DSL), which is operating on streaming instead of 
individual event. The API document can be found at [DSL API 
Doc](http://www.gearpump.io/releases/latest/api/scala/index.html#io.gearpump.streaming.dsl.package).
+2. High level API (aka DSL), which is operating on streaming instead of 
individual event. The API document can be found at [DSL API 
Doc](http://www.gearpump.io/releases/latest/api/scala/index.html#org.apache.gearpump.streaming.dsl.package).
 
 And both APIs have their Java version and Scala version.
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/submit-your-1st-application.md
----------------------------------------------------------------------
diff --git a/docs/submit-your-1st-application.md 
b/docs/submit-your-1st-application.md
index b72329d..3bdb2f8 100644
--- a/docs/submit-your-1st-application.md
+++ b/docs/submit-your-1st-application.md
@@ -26,7 +26,7 @@ Open another shell,
 
 ```bash
 ### To run WordCount example
-bin/gear app -jar 
examples/wordcount-{{site.SCALA_BINARY_VERSION}}-{{site.GEARPUMP_VERSION}}-assembly.jar
 io.gearpump.streaming.examples.wordcount.WordCount
+bin/gear app -jar 
examples/wordcount-{{site.SCALA_BINARY_VERSION}}-{{site.GEARPUMP_VERSION}}-assembly.jar
 org.apache.gearpump.streaming.examples.wordcount.WordCount
 ```
 
 ###  Step 2: Congratulations, you've submitted your first application.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/README.md
----------------------------------------------------------------------
diff --git a/examples/distributedshell/README.md 
b/examples/distributedshell/README.md
index f21e87e..437960a 100644
--- a/examples/distributedshell/README.md
+++ b/examples/distributedshell/README.md
@@ -8,10 +8,10 @@ In order to run the example:
 
   2. Start the AppMaster:<br>
   ```bash
-  target/pack/bin/gear app -jar 
target/pack/examples/gearpump-experiments-distributedshell_$VERSION.jar 
io.gearpump.examples.distributedshell.DistributedShell
+  target/pack/bin/gear app -jar 
target/pack/examples/gearpump-experiments-distributedshell_$VERSION.jar 
org.apache.gearpump.examples.distributedshell.DistributedShell
   ```
 
   3. Submit the shell command:<br>
   ```bash
-  target/pack/bin/gear app -verbose true -jar 
target/pack/examples/gearpump-experiments-distributedshell_$VERSION.jar 
io.gearpump.examples.distributedshell.DistributedShellClient -appid $APPID 
-command "ls /"
+  target/pack/bin/gear app -verbose true -jar 
target/pack/examples/gearpump-experiments-distributedshell_$VERSION.jar 
org.apache.gearpump.examples.distributedshell.DistributedShellClient -appid 
$APPID -command "ls /"
   ```
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistShellAppMaster.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistShellAppMaster.scala
 
b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistShellAppMaster.scala
deleted file mode 100644
index 9b417c0..0000000
--- 
a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistShellAppMaster.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.examples.distributedshell
-
-import scala.concurrent.Future
-
-import akka.actor.{Deploy, Props}
-import akka.pattern.{ask, pipe}
-import akka.remote.RemoteScope
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClientToMaster.ShutdownApplication
-import 
io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, 
ExecutorSystemStarted, StartExecutorSystemTimeout}
-import io.gearpump.cluster.{AppDescription, AppMasterContext, 
ApplicationMaster, ExecutorContext}
-import io.gearpump.examples.distributedshell.DistShellAppMaster._
-import io.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
-
-class DistShellAppMaster(appContext: AppMasterContext, app: AppDescription)
-  extends ApplicationMaster {
-
-  import appContext._
-  import context.dispatcher
-  implicit val timeout = Constants.FUTURE_TIMEOUT
-  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
-  protected var currentExecutorId = 0
-
-  override def preStart(): Unit = {
-    LOG.info(s"Distributed Shell AppMaster started")
-    ActorUtil.launchExecutorOnEachWorker(masterProxy, getExecutorJvmConfig, 
self)
-  }
-
-  override def receive: Receive = {
-    case ExecutorSystemStarted(executorSystem, _) =>
-      import executorSystem.{address, resource => executorResource, worker}
-      val executorContext = ExecutorContext(currentExecutorId, worker, appId, 
app.name,
-        self, executorResource)
-      // Start executor
-      val executor = context.actorOf(Props(classOf[ShellExecutor], 
executorContext, app.userConfig)
-        .withDeploy(Deploy(scope = RemoteScope(address))), 
currentExecutorId.toString)
-      executorSystem.bindLifeCycleWith(executor)
-      currentExecutorId += 1
-    case StartExecutorSystemTimeout =>
-      LOG.error(s"Failed to allocate resource in time")
-      masterProxy ! ShutdownApplication(appId)
-      context.stop(self)
-    case msg: ShellCommand =>
-      Future.fold(context.children.map(_ ? msg))(new 
ShellCommandResultAggregator) {
-        (aggregator, response) => {
-          aggregator.aggregate(response.asInstanceOf[ShellCommandResult])
-        }
-      }.map(_.toString()) pipeTo sender
-  }
-
-  private def getExecutorJvmConfig: ExecutorSystemJvmConfig = {
-    val config: Config = app.clusterConfig
-    val jvmSetting = 
Util.resolveJvmSetting(config.withFallback(context.system.settings.config))
-      .executor
-    ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs,
-      appJar, username, config)
-  }
-}
-
-object DistShellAppMaster {
-  case class ShellCommand(command: String)
-
-  case class ShellCommandResult(executorId: Int, msg: Any)
-
-  class ShellCommandResultAggregator {
-    val result: StringBuilder = new StringBuilder
-
-    def aggregate(response: ShellCommandResult): ShellCommandResultAggregator 
= {
-      result.append(s"Execute results from executor ${response.executorId} : 
\n")
-      result.append(response.msg + "\n")
-      this
-    }
-
-    override def toString(): String = result.toString()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShell.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShell.scala
 
b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShell.scala
deleted file mode 100644
index b7b3eb0..0000000
--- 
a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShell.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.examples.distributedshell
-
-import org.slf4j.Logger
-
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.cluster.{Application, UserConfig}
-import io.gearpump.util.{AkkaApp, LogUtil}
-
-/** Demo application to distribute and execute shell command on all machines 
of the cluster */
-object DistributedShell extends AkkaApp with ArgumentsParser {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array.empty
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    LOG.info(s"Distributed shell submitting application...")
-    val context = ClientContext(akkaConf)
-    val appId = 
context.submit(Application[DistShellAppMaster]("DistributedShell",
-    UserConfig.empty))
-    context.close()
-    LOG.info(s"Distributed Shell Application started with appId $appId !")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShellClient.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShellClient.scala
 
b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShellClient.scala
deleted file mode 100644
index cd6f943..0000000
--- 
a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShellClient.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.examples.distributedshell
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.pattern.ask
-import org.slf4j.{Logger, LoggerFactory}
-
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommand
-import io.gearpump.util.{AkkaApp, Constants}
-
-/** Client to DistributedShell to input "shell command" */
-object DistributedShellClient extends AkkaApp with ArgumentsParser {
-  implicit val timeout = Constants.FUTURE_TIMEOUT
-  private val LOG: Logger = LoggerFactory.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "appid" -> CLIOption[Int]("<the distributed shell appid>", required = 
true),
-    "command" -> CLIOption[String]("<shell command>", required = true)
-  )
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-    implicit val system = context.system
-    implicit val dispatcher = system.dispatcher
-    val appid = config.getInt("appid")
-    val command = config.getString("command")
-    val appMaster = context.resolveAppID(appid)
-    LOG.info(s"Resolved appMaster $appid address $appMaster, sending command 
$command")
-    val future = (appMaster ? ShellCommand(command)).map { result =>
-      LOG.info(s"Result: \n$result")
-      context.close()
-    }
-    Await.ready(future, Duration(60, TimeUnit.SECONDS))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/ShellExecutor.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/ShellExecutor.scala
 
b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/ShellExecutor.scala
deleted file mode 100644
index 2d0fd06..0000000
--- 
a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/ShellExecutor.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.examples.distributedshell
-
-import scala.sys.process._
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.Actor
-import org.slf4j.Logger
-
-import io.gearpump.cluster.{ExecutorContext, UserConfig}
-import io.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommand, 
ShellCommandResult}
-import io.gearpump.util.LogUtil
-
-/** Executor actor on remote machine */
-class ShellExecutor(executorContext: ExecutorContext, userConf: UserConfig) 
extends Actor {
-  import executorContext._
-  private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, 
app = appId)
-
-  LOG.info(s"ShellExecutor started!")
-
-  override def receive: Receive = {
-    case ShellCommand(command) =>
-      val process = Try(s"$command".!!)
-      val result = process match {
-        case Success(msg) => msg
-        case Failure(ex) => ex.getMessage
-      }
-      sender ! ShellCommandResult(executorId, result)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala
 
b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala
new file mode 100644
index 0000000..cd0b18b
--- /dev/null
+++ 
b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.examples.distributedshell
+
+import scala.concurrent.Future
+
+import akka.actor.{Deploy, Props}
+import akka.pattern.{ask, pipe}
+import akka.remote.RemoteScope
+import com.typesafe.config.Config
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClientToMaster.ShutdownApplication
+import 
org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig,
 ExecutorSystemStarted, StartExecutorSystemTimeout}
+import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, 
ApplicationMaster, ExecutorContext}
+import org.apache.gearpump.examples.distributedshell.DistShellAppMaster._
+import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
+
+class DistShellAppMaster(appContext: AppMasterContext, app: AppDescription)
+  extends ApplicationMaster {
+
+  import appContext._
+  import context.dispatcher
+  implicit val timeout = Constants.FUTURE_TIMEOUT
+  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
+  protected var currentExecutorId = 0
+
+  override def preStart(): Unit = {
+    LOG.info(s"Distributed Shell AppMaster started")
+    ActorUtil.launchExecutorOnEachWorker(masterProxy, getExecutorJvmConfig, 
self)
+  }
+
+  override def receive: Receive = {
+    case ExecutorSystemStarted(executorSystem, _) =>
+      import executorSystem.{address, resource => executorResource, worker}
+      val executorContext = ExecutorContext(currentExecutorId, worker, appId, 
app.name,
+        self, executorResource)
+      // Start executor
+      val executor = context.actorOf(Props(classOf[ShellExecutor], 
executorContext, app.userConfig)
+        .withDeploy(Deploy(scope = RemoteScope(address))), 
currentExecutorId.toString)
+      executorSystem.bindLifeCycleWith(executor)
+      currentExecutorId += 1
+    case StartExecutorSystemTimeout =>
+      LOG.error(s"Failed to allocate resource in time")
+      masterProxy ! ShutdownApplication(appId)
+      context.stop(self)
+    case msg: ShellCommand =>
+      Future.fold(context.children.map(_ ? msg))(new 
ShellCommandResultAggregator) {
+        (aggregator, response) => {
+          aggregator.aggregate(response.asInstanceOf[ShellCommandResult])
+        }
+      }.map(_.toString()) pipeTo sender
+  }
+
+  private def getExecutorJvmConfig: ExecutorSystemJvmConfig = {
+    val config: Config = app.clusterConfig
+    val jvmSetting = 
Util.resolveJvmSetting(config.withFallback(context.system.settings.config))
+      .executor
+    ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs,
+      appJar, username, config)
+  }
+}
+
+object DistShellAppMaster {
+  case class ShellCommand(command: String)
+
+  case class ShellCommandResult(executorId: Int, msg: Any)
+
+  class ShellCommandResultAggregator {
+    val result: StringBuilder = new StringBuilder
+
+    def aggregate(response: ShellCommandResult): ShellCommandResultAggregator 
= {
+      result.append(s"Execute results from executor ${response.executorId} : 
\n")
+      result.append(response.msg + "\n")
+      this
+    }
+
+    override def toString(): String = result.toString()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
 
b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
new file mode 100644
index 0000000..c4eec07
--- /dev/null
+++ 
b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.examples.distributedshell
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.cluster.{Application, UserConfig}
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+/** Demo application to distribute and execute shell command on all machines 
of the cluster */
+object DistributedShell extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    LOG.info(s"Distributed shell submitting application...")
+    val context = ClientContext(akkaConf)
+    val appId = 
context.submit(Application[DistShellAppMaster]("DistributedShell",
+    UserConfig.empty))
+    context.close()
+    LOG.info(s"Distributed Shell Application started with appId $appId !")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClient.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClient.scala
 
b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClient.scala
new file mode 100644
index 0000000..8a7efa5
--- /dev/null
+++ 
b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClient.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.examples.distributedshell
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.pattern.ask
+import org.slf4j.{Logger, LoggerFactory}
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import 
org.apache.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommand
+import org.apache.gearpump.util.{AkkaApp, Constants}
+
+/** Client to DistributedShell to input "shell command" */
+object DistributedShellClient extends AkkaApp with ArgumentsParser {
+  implicit val timeout = Constants.FUTURE_TIMEOUT
+  private val LOG: Logger = LoggerFactory.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "appid" -> CLIOption[Int]("<the distributed shell appid>", required = 
true),
+    "command" -> CLIOption[String]("<shell command>", required = true)
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+    implicit val system = context.system
+    implicit val dispatcher = system.dispatcher
+    val appid = config.getInt("appid")
+    val command = config.getString("command")
+    val appMaster = context.resolveAppID(appid)
+    LOG.info(s"Resolved appMaster $appid address $appMaster, sending command 
$command")
+    val future = (appMaster ? ShellCommand(command)).map { result =>
+      LOG.info(s"Result: \n$result")
+      context.close()
+    }
+    Await.ready(future, Duration(60, TimeUnit.SECONDS))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/ShellExecutor.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/ShellExecutor.scala
 
b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/ShellExecutor.scala
new file mode 100644
index 0000000..fc394d8
--- /dev/null
+++ 
b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/ShellExecutor.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.examples.distributedshell
+
+import scala.sys.process._
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.Actor
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.{ExecutorContext, UserConfig}
+import 
org.apache.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommand, 
ShellCommandResult}
+import org.apache.gearpump.util.LogUtil
+
+/** Executor actor on remote machine */
+class ShellExecutor(executorContext: ExecutorContext, userConf: UserConfig) 
extends Actor {
+  import executorContext._
+  private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, 
app = appId)
+
+  LOG.info(s"ShellExecutor started!")
+
+  override def receive: Receive = {
+    case ShellCommand(command) =>
+      val process = Try(s"$command".!!)
+      val result = process match {
+        case Success(msg) => msg
+        case Failure(ex) => ex.getMessage
+      }
+      sender ! ShellCommandResult(executorId, result)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
 
b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
deleted file mode 100644
index 2d63734..0000000
--- 
a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.examples.distributedshell
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import akka.testkit.{TestActorRef, TestProbe}
-import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
-
-import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, 
RegisterAppMaster, RequestResource}
-import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import io.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, 
ResourceAllocated, WorkerList}
-import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
AppMasterRuntimeInfo}
-import io.gearpump.cluster.scheduler.{Relaxation, Resource, 
ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.util.ActorSystemBooter.RegisterActorSystem
-import io.gearpump.util.ActorUtil
-
-class DistShellAppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfter {
-  implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG)
-  val mockMaster = TestProbe()(system)
-  val mockWorker1 = TestProbe()(system)
-  val masterProxy = mockMaster.ref
-  val appId = 0
-  val userName = "test"
-  val masterExecutorId = 0
-  val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L))
-  val resource = Resource(1)
-  val appJar = None
-  val appDescription = AppDescription("app0", 
classOf[DistShellAppMaster].getName, UserConfig.empty)
-
-  "DistributedShell AppMaster" should {
-    "launch one ShellTask on each worker" in {
-      val appMasterInfo = AppMasterRuntimeInfo(appId, appName = appId.toString)
-      val appMasterContext = AppMasterContext(appId, userName, resource, null, 
appJar,
-        masterProxy, appMasterInfo)
-      TestActorRef[DistShellAppMaster](
-        AppMasterRuntimeEnvironment.props(List(masterProxy.path), 
appDescription, appMasterContext))
-      mockMaster.expectMsgType[RegisterAppMaster]
-      mockMaster.reply(AppMasterRegistered(appId))
-      // The DistributedShell AppMaster asks for worker list from Master.
-      mockMaster.expectMsg(GetAllWorkers)
-      mockMaster.reply(WorkerList(workerList))
-      // After worker list is ready, DistributedShell AppMaster requests 
resource on each worker
-      workerList.foreach { workerId =>
-        mockMaster.expectMsg(RequestResource(appId, 
ResourceRequest(Resource(1), workerId,
-          relaxation = Relaxation.SPECIFICWORKER)))
-      }
-      mockMaster.reply(ResourceAllocated(
-        Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L)))))
-      mockWorker1.expectMsgClass(classOf[LaunchExecutor])
-      
mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
-    }
-  }
-
-  after {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
 
b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
deleted file mode 100644
index 973b3b3..0000000
--- 
a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.examples.distributedshell
-
-import scala.concurrent.Future
-import scala.util.{Success, Try}
-
-import akka.testkit.TestProbe
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.ResolveAppId
-import io.gearpump.cluster.MasterToClient.ResolveAppIdResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommand
-import io.gearpump.util.LogUtil
-
-class DistributedShellClientSpec
-  extends PropSpec with Matchers with BeforeAndAfter with MasterHarness {
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  before {
-    startActorSystem()
-  }
-
-  after {
-    shutdownActorSystem()
-  }
-
-  protected override def config = TestUtil.DEFAULT_CONFIG
-
-  property("DistributedShellClient should succeed to submit application with 
required arguments") {
-    val command = "ls /"
-    val requiredArgs = Array("-appid", "0", "-command", command)
-    val masterReceiver = createMockMaster()
-
-    assert(Try(DistributedShellClient.main(Array.empty[String])).isFailure,
-      "missing required arguments, print usage")
-
-    Future {
-      DistributedShellClient.main(masterConfig, requiredArgs)
-    }
-
-    masterReceiver.expectMsg(PROCESS_BOOT_TIME, ResolveAppId(0))
-    val mockAppMaster = TestProbe()(getActorSystem)
-    masterReceiver.reply(ResolveAppIdResult(Success(mockAppMaster.ref)))
-    LOG.info(s"Reply back ResolveAppIdResult, current actorRef: 
${mockAppMaster.ref.path.toString}")
-    mockAppMaster.expectMsg(PROCESS_BOOT_TIME, ShellCommand(command))
-    mockAppMaster.reply("result")
-  }
-}


Reply via email to