http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala
deleted file mode 100644
index a48f887..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.task
-
-import akka.actor.{ExtendedActorSystem, Props}
-import akka.testkit._
-import com.typesafe.config.{Config, ConfigFactory}
-import org.mockito.Mockito.{mock, times, verify, when}
-import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.{MasterHarness, TestUtil, UserConfig}
-import io.gearpump.partitioner.{HashPartitioner, Partitioner}
-import io.gearpump.serializer.{FastKryoSerializer, SerializationFramework}
-import io.gearpump.streaming.AppMasterToExecutor.{ChangeTask, 
MsgLostException, StartTask, TaskChanged, TaskRegistered}
-import io.gearpump.streaming.task.TaskActorSpec.TestTask
-import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription}
-import io.gearpump.util.Graph._
-import io.gearpump.util.{Graph, Util}
-
-class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach 
with MasterHarness {
-  protected override def config: Config = {
-    ConfigFactory.parseString(
-      """ akka.loggers = ["akka.testkit.TestEventListener"]
-        | akka.test.filter-leeway = 20000
-      """.stripMargin).
-      withFallback(TestUtil.DEFAULT_CONFIG)
-  }
-
-  val appId = 0
-  val task1 = ProcessorDescription(id = 0, taskClass = 
classOf[TestTask].getName, parallelism = 1)
-  val task2 = ProcessorDescription(id = 1, taskClass = 
classOf[TestTask].getName, parallelism = 1)
-  val dag: DAG = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
-  val taskId1 = TaskId(0, 0)
-  val taskId2 = TaskId(1, 0)
-  val executorId1 = 1
-  val executorId2 = 2
-
-  var mockMaster: TestProbe = null
-  var taskContext1: TaskContextData = null
-
-  var mockSerializerPool: SerializationFramework = null
-
-  override def beforeEach(): Unit = {
-    startActorSystem()
-    mockMaster = TestProbe()(getActorSystem)
-
-    mockSerializerPool = mock(classOf[SerializationFramework])
-    val serializer = new 
FastKryoSerializer(getActorSystem.asInstanceOf[ExtendedActorSystem])
-    when(mockSerializerPool.get()).thenReturn(serializer)
-
-    taskContext1 = TaskContextData(executorId1, appId,
-      "appName", mockMaster.ref, 1,
-      LifeTime.Immortal,
-      Subscriber.of(processorId = 0, dag))
-  }
-
-  "TaskActor" should {
-    "register itself to AppMaster when started" in {
-      val mockTask = mock(classOf[TaskWrapper])
-      val testActor = TestActorRef[TaskActor](Props(
-        new TaskActor(taskId1, taskContext1, UserConfig.empty,
-          mockTask, mockSerializerPool)))(getActorSystem)
-      testActor ! TaskRegistered(taskId1, 0, Util.randInt())
-      testActor ! StartTask(taskId1)
-
-      implicit val system = getActorSystem
-      val ack = Ack(taskId2, 100, 99, testActor.underlyingActor.sessionId)
-      EventFilter[MsgLostException](occurrences = 1) intercept {
-        testActor ! ack
-      }
-    }
-
-    "respond to ChangeTask" in {
-      val mockTask = mock(classOf[TaskWrapper])
-      val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, 
taskContext1,
-      UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem)
-      testActor ! TaskRegistered(taskId1, 0, Util.randInt())
-      testActor ! StartTask(taskId1)
-      mockMaster.expectMsgType[GetUpstreamMinClock]
-
-      mockMaster.send(testActor, ChangeTask(taskId1, 1, LifeTime.Immortal, 
List.empty[Subscriber]))
-      mockMaster.expectMsgType[TaskChanged]
-    }
-
-    "handle received message correctly" in {
-      val mockTask = mock(classOf[TaskWrapper])
-      val msg = Message("test")
-
-      val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, 
taskContext1,
-      UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem)
-      testActor.tell(TaskRegistered(taskId1, 0, Util.randInt()), 
mockMaster.ref)
-      testActor.tell(StartTask(taskId1), mockMaster.ref)
-
-      testActor.tell(msg, testActor)
-
-      verify(mockTask, times(1)).onNext(msg)
-    }
-  }
-
-  override def afterEach(): Unit = {
-    shutdownActorSystem()
-  }
-}
-
-object TaskActorSpec {
-  class TestTask
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
new file mode 100644
index 0000000..5f4faee
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming
+
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.task.TaskId
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.Graph.Node
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class DAGSpec extends PropSpec with PropertyChecks with Matchers {
+
+  val parallelismGen = Gen.chooseNum[Int](1, 100)
+
+  property("DAG should be built correctly for a single task") {
+    forAll(parallelismGen) { (parallelism: Int) =>
+      val task = ProcessorDescription(id = 0, taskClass = "task", parallelism 
= parallelism)
+      val graph = Graph[ProcessorDescription, PartitionerDescription](task)
+      val dag = DAG(graph)
+      dag.processors.size shouldBe 1
+      assert(dag.taskCount == parallelism)
+      dag.tasks.sortBy(_.index) shouldBe (0 until parallelism).map(index => 
TaskId(0, index))
+      dag.graph.edges shouldBe empty
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala
new file mode 100644
index 0000000..f6f6af2
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming
+
+import org.jboss.netty.buffer.{ChannelBufferOutputStream, ChannelBuffers}
+import org.scalatest.{Matchers, WordSpec}
+
+import org.apache.gearpump.streaming.task._
+import org.apache.gearpump.transport.netty.WrappedChannelBuffer
+
+class MessageSerializerSpec extends WordSpec with Matchers {
+
+  def testSerializer[T](obj: T, taskMessageSerializer: 
TaskMessageSerializer[T]): T = {
+    val length = taskMessageSerializer.getLength(obj)
+    val bout = new ChannelBufferOutputStream(ChannelBuffers.buffer(length))
+    taskMessageSerializer.write(bout, obj)
+    val input = new 
WrappedChannelBuffer(ChannelBuffers.wrappedBuffer(bout.buffer().array()))
+    taskMessageSerializer.read(input)
+  }
+
+  "SerializedMessageSerializer" should {
+    "serialize and deserialize SerializedMessage properly" in {
+      val serializer = new SerializedMessageSerializer
+      val data = new Array[Byte](256)
+      new java.util.Random().nextBytes(data)
+      val msg = SerializedMessage(1024, data)
+      val result = testSerializer(msg, serializer)
+      assert(result.timeStamp == msg.timeStamp && 
result.bytes.sameElements(msg.bytes))
+    }
+  }
+
+  "TaskIdSerializer" should {
+    "serialize and deserialize TaskId properly" in {
+      val taskIdSerializer = new TaskIdSerializer
+      val taskId = TaskId(1, 3)
+      assert(testSerializer(taskId, taskIdSerializer).equals(taskId))
+    }
+  }
+
+  "AckRequestSerializer" should {
+    "serialize and deserialize AckRequest properly" in {
+      val serializer = new AckRequestSerializer
+      val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024)
+      assert(testSerializer(ackRequest, serializer).equals(ackRequest))
+    }
+  }
+
+  "InitialAckRequestSerializer" should {
+    "serialize and deserialize AckRequest properly" in {
+      val serializer = new InitialAckRequestSerializer
+      val ackRequest = InitialAckRequest(TaskId(1, 2), 1024)
+      assert(testSerializer(ackRequest, serializer).equals(ackRequest))
+    }
+  }
+
+  "AckSerializer" should {
+    "serialize and deserialize Ack properly" in {
+      val serializer = new AckSerializer
+      val ack = Ack(TaskId(1, 2), 1024, 1023, 1799)
+      assert(testSerializer(ack, serializer).equals(ack))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/MockUtil.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/MockUtil.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/MockUtil.scala
new file mode 100644
index 0000000..f24c024
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/MockUtil.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming
+
+import akka.actor.{Actor, ActorSystem}
+import akka.testkit.TestActorRef
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.streaming.task.{TaskContext, TaskId}
+import org.mockito.{ArgumentMatcher, Matchers, Mockito}
+
+object MockUtil {
+
+  lazy val system: ActorSystem = ActorSystem("mockUtil", 
TestUtil.DEFAULT_CONFIG)
+
+  def mockTaskContext: TaskContext = {
+    val context = Mockito.mock(classOf[TaskContext])
+    
Mockito.when(context.self).thenReturn(Mockito.mock(classOf[TestActorRef[Actor]]))
+    Mockito.when(context.system).thenReturn(system)
+    Mockito.when(context.parallelism).thenReturn(1)
+    Mockito.when(context.taskId).thenReturn(TaskId(0, 0))
+    context
+  }
+
+  def argMatch[T](func: T => Boolean): T = {
+    Matchers.argThat(new ArgumentMatcher[T] {
+      override def matches(param: Any): Boolean = {
+        val mesage = param.asInstanceOf[T]
+        func(mesage)
+      }
+    })
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
new file mode 100644
index 0000000..f4fffb5
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming
+
+import akka.actor._
+import akka.testkit.TestActorRef
+import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
+import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, 
MiniCluster, UserConfig}
+import org.apache.gearpump.streaming.appmaster.AppMaster
+import org.apache.gearpump.util.Graph
+
+object StreamingTestUtil {
+  private var executorId = 0
+  val testUserName = "testuser"
+
+  def startAppMaster(miniCluster: MiniCluster, appId: Int): 
TestActorRef[AppMaster] = {
+
+    implicit val actorSystem = miniCluster.system
+    val masterConf = AppMasterContext(appId, testUserName, Resource(1), null,
+      None, miniCluster.mockMaster, AppMasterRuntimeInfo(appId, appName = 
appId.toString))
+
+    val app = StreamApplication("test", Graph.empty, UserConfig.empty)
+    val appDescription = AppDescription(app.name, app.appMaster.getName, 
app.userConfig)
+    val props = Props(new AppMaster(masterConf, appDescription))
+    val appMaster = 
miniCluster.launchActor(props).asInstanceOf[TestActorRef[AppMaster]]
+    val registerAppMaster = RegisterAppMaster(appMaster, 
masterConf.registerData)
+    miniCluster.mockMaster.tell(registerAppMaster, appMaster)
+
+    appMaster
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
new file mode 100644
index 0000000..38a5cf1
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.appmaster
+
+import akka.actor.{ActorRef, Props}
+import akka.testkit.{TestActorRef, TestProbe}
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.AppMasterToMaster._
+import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
+import org.apache.gearpump.cluster.ClientToMaster.ShutdownApplication
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, 
ResourceAllocated}
+import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
+import org.apache.gearpump.cluster._
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
AppMasterRuntimeInfo}
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, 
ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.jarstore.FilePath
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _}
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.Graph._
+import org.scalatest._
+
+import scala.concurrent.duration._
+
+class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach 
with MasterHarness {
+  protected override def config = TestUtil.DEFAULT_CONFIG
+
+  var appMaster: ActorRef = null
+
+  val appId = 0
+  val workerId = WorkerId(1, 0L)
+  val resource = Resource(1)
+  val taskDescription1 = Processor[TaskA](2)
+  val taskDescription2 = Processor[TaskB](2)
+  val partitioner = new HashPartitioner
+  var conf: UserConfig = null
+
+  var mockTask: TestProbe = null
+
+  var mockMaster: TestProbe = null
+  var mockMasterProxy: ActorRef = null
+
+  var mockWorker: TestProbe = null
+  var appDescription: AppDescription = null
+  var appMasterContext: AppMasterContext = null
+  var appMasterRuntimeInfo: AppMasterRuntimeInfo = null
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+
+    mockTask = TestProbe()(getActorSystem)
+
+    mockMaster = TestProbe()(getActorSystem)
+    mockWorker = TestProbe()(getActorSystem)
+    mockMaster.ignoreMsg(ignoreSaveAppData)
+    appMasterRuntimeInfo = AppMasterRuntimeInfo(appId, appName = 
appId.toString)
+
+    implicit val system = getActorSystem
+    conf = UserConfig.empty.withValue(AppMasterSpec.MASTER, mockMaster.ref)
+    val mockJar = AppJar("for_test", FilePath("path"))
+    appMasterContext = AppMasterContext(appId, "test", resource, null, 
Some(mockJar),
+      mockMaster.ref, appMasterRuntimeInfo)
+    val graph = Graph(taskDescription1 ~ partitioner ~> taskDescription2)
+    val streamApp = StreamApplication("test", graph, conf)
+    appDescription = Application.ApplicationToAppDescription(streamApp)
+    import scala.concurrent.duration._
+    mockMasterProxy = getActorSystem.actorOf(Props(new 
MasterProxy(List(mockMaster.ref.path),
+      30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
+    TestActorRef[AppMaster](
+      AppMasterRuntimeEnvironment.props(List(mockMasterProxy.path), 
appDescription,
+        appMasterContext))(getActorSystem)
+
+    val registerAppMaster = mockMaster.receiveOne(15.seconds)
+    assert(registerAppMaster.isInstanceOf[RegisterAppMaster])
+    appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster
+
+    mockMaster.reply(AppMasterRegistered(appId))
+    mockMaster.expectMsg(15.seconds, GetAppData(appId, "DAG"))
+    mockMaster.reply(GetAppDataResult("DAG", null))
+    mockMaster.expectMsg(15.seconds, GetAppData(appId, "startClock"))
+
+    mockMaster.reply(GetAppDataResult("startClock", 0L))
+
+    mockMaster.expectMsg(15.seconds, RequestResource(appId, 
ResourceRequest(Resource(4),
+      workerId = WorkerId.unspecified)))
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "AppMaster" should {
+    "kill it self when allocate resource time out" in {
+      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2),
+        mockWorker.ref, workerId))))
+      mockMaster.expectMsg(60.seconds, ShutdownApplication(appId))
+    }
+
+    "reschedule the resource when the worker reject to start executor" in {
+      val resource = Resource(4)
+      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource,
+        mockWorker.ref, workerId))))
+      mockWorker.expectMsgClass(classOf[LaunchExecutor])
+      mockWorker.reply(ExecutorLaunchRejected(""))
+      mockMaster.expectMsg(RequestResource(appId, ResourceRequest(resource, 
WorkerId.unspecified)))
+    }
+
+    "find a new master when lost connection with master" in {
+
+      val watcher = TestProbe()(getActorSystem)
+      watcher.watch(mockMasterProxy)
+      getActorSystem.stop(mockMasterProxy)
+      watcher.expectTerminated(mockMasterProxy)
+      // Make sure the parent of mockMasterProxy has received the Terminated 
message.
+      // Issus address: https://github.com/gearpump/gearpump/issues/1919
+      Thread.sleep(2000)
+
+      import scala.concurrent.duration._
+      mockMasterProxy = getActorSystem.actorOf(Props(new 
MasterProxy(List(mockMaster.ref.path),
+        30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
+      mockMaster.expectMsgClass(15.seconds, classOf[RegisterAppMaster])
+    }
+
+    //    // TODO: This test is failing on Travis randomly
+    //    // We have not identifed the root cause.
+    //    // Check: https://travis-ci.org/intel-hadoop/gearpump/builds/56826843
+    //    // Issue tracker: https://github.com/intel-hadoop/gearpump/issues/733
+    //
+    //    "launch executor and task properly" in {
+    //      
mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), 
mockWorker.ref,
+    //       workerId))))
+    //      mockWorker.expectMsgClass(classOf[LaunchExecutor])
+    //
+    //      val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG)
+    //      
mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString))
+    //      for (i <- 1 to 4) {
+    //        mockMaster.expectMsg(10 seconds, AppMasterSpec.TaskStarted)
+    //      }
+    //
+    //      // clock status: task(0,0) -> 1, task(0,1)->0, task(1, 0)->0, 
task(1,1)->0
+    //      appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref)
+    //
+    //      // there is no further upstream, so the upstreamMinClock is 
Long.MaxValue
+    //      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
+    //
+    //      // check min clock
+    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
+    //      mockTask.expectMsg(LatestMinClock(0))
+    //
+    //
+    //      // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, 
task(1,1)->0
+    //      appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref)
+    //
+    //      // there is no further upstream, so the upstreamMinClock is 
Long.MaxValue
+    //      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
+    //
+    //      // check min clock
+    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
+    //      mockTask.expectMsg(LatestMinClock(0))
+    //
+    //      // Clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, 
task(1,1)->0
+    //      appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref)
+    //
+    //      // Min clock of processor 0 (Task(0, 0) and Task(0, 1))
+    //      mockTask.expectMsg(UpstreamMinClock(1))
+    //
+    //      // check min clock
+    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
+    //      mockTask.expectMsg(LatestMinClock(0))
+    //
+    //      // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, 
task(1,1)->1
+    //      appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref)
+    //
+    //      // min clock of processor 0 (Task(0, 0) and Task(0, 1))
+    //      mockTask.expectMsg(UpstreamMinClock(1))
+    //
+    //      // check min clock
+    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
+    //      mockTask.expectMsg(LatestMinClock(1))
+    //
+    //      // shutdown worker and all executor on this work, expect appmaster 
to ask
+    //      // for new resources
+    //      workerSystem.shutdown()
+    //      mockMaster.expectMsg(RequestResource(appId, 
ResourceRequest(Resource(4), relaxation =
+    //        Relaxation.ONEWORKER)))
+    //    }
+  }
+
+  def ignoreSaveAppData: PartialFunction[Any, Boolean] = {
+    case msg: SaveAppData => true
+  }
+}
+
+object AppMasterSpec {
+  val MASTER = "master"
+  case object TaskStarted
+
+  val MOCK_MASTER_PROXY = "mockMasterProxy"
+}
+
+class TaskA(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
+
+  val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get
+  override def onStart(startTime: StartTime): Unit = {
+    master ! AppMasterSpec.TaskStarted
+  }
+
+  override def onNext(msg: Message): Unit = {}
+}
+
+class TaskB(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
+
+  val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get
+  override def onStart(startTime: StartTime): Unit = {
+    master ! AppMasterSpec.TaskStarted
+  }
+
+  override def onNext(msg: Message): Unit = {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
new file mode 100644
index 0000000..2bb33b7
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.appmaster
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, 
PartitionerDescription}
+import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, 
ChangeToNewDAGSuccess, HealthChecker, ProcessorClock}
+import org.apache.gearpump.streaming.appmaster.ClockServiceSpec.Store
+import org.apache.gearpump.streaming.storage.AppDataStore
+import org.apache.gearpump.streaming.task.{GetLatestMinClock, GetStartClock, 
UpstreamMinClock, _}
+import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.Graph._
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.concurrent.{Future, Promise}
+
+class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with 
ImplicitSender
+  with WordSpecLike with Matchers with BeforeAndAfterAll {
+
+  def this() = this(ActorSystem("ClockServiceSpec", TestUtil.DEFAULT_CONFIG))
+
+  val hash = Partitioner[HashPartitioner]
+  val task1 = ProcessorDescription(id = 0, taskClass = 
classOf[TaskActor].getName, parallelism = 1)
+  val task2 = ProcessorDescription(id = 1, taskClass = 
classOf[TaskActor].getName, parallelism = 1)
+  val dag = DAG(Graph(task1 ~ hash ~> task2))
+
+  override def afterAll {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  "The ClockService" should {
+    "maintain a global view of message timestamp in the application" in {
+      val store = new Store()
+      val startClock = 100L
+      store.put(ClockService.START_CLOCK, startClock)
+      val clockService = system.actorOf(Props(new ClockService(dag, store)))
+      clockService ! GetLatestMinClock
+      expectMsg(LatestMinClock(startClock))
+
+      // task(0,0): clock(101); task(1,0): clock(100)
+      clockService ! UpdateClock(TaskId(0, 0), 101)
+
+      // There is no upstream, so pick Long.MaxValue
+      expectMsg(UpstreamMinClock(Long.MaxValue))
+
+      // Min clock is updated
+      clockService ! GetLatestMinClock
+      expectMsg(LatestMinClock(100))
+
+      // task(0,0): clock(101); task(1,0): clock(101)
+      clockService ! UpdateClock(TaskId(1, 0), 101)
+
+      // Upstream is Task(0, 0), 101
+      expectMsg(UpstreamMinClock(101))
+
+      // Min clock is updated
+      clockService ! GetLatestMinClock
+      expectMsg(LatestMinClock(101))
+    }
+
+    "act on ChangeToNewDAG and make sure downstream clock smaller than 
upstreams" in {
+      val store = new Store()
+      val startClock = 100L
+      store.put(ClockService.START_CLOCK, startClock)
+      val clockService = system.actorOf(Props(new ClockService(dag, store)))
+      val task = TestProbe()
+      clockService.tell(UpdateClock(TaskId(0, 0), 200), task.ref)
+      task.expectMsgType[UpstreamMinClock]
+
+      val task3 = ProcessorDescription(id = 3, taskClass = 
classOf[TaskActor].getName,
+        parallelism = 1)
+      val task4 = ProcessorDescription(id = 4, taskClass = 
classOf[TaskActor].getName,
+        parallelism = 1)
+      val task5 = ProcessorDescription(id = 5, taskClass = 
classOf[TaskActor].getName,
+        parallelism = 1)
+      val dagAddMiddleNode = DAG(Graph(
+        task1 ~ hash ~> task2,
+        task1 ~ hash ~> task3,
+        task3 ~ hash ~> task2,
+        task2 ~ hash ~> task4,
+        task5 ~ hash ~> task1
+      ))
+      val user = TestProbe()
+      clockService.tell(ChangeToNewDAG(dagAddMiddleNode), user.ref)
+
+      val clocks = user.expectMsgPF() {
+        case ChangeToNewDAGSuccess(clocks) =>
+          clocks
+      }
+
+      // For intermediate task, pick its upstream as initial clock
+      assert(clocks(task3.id) == clocks(task1.id))
+
+      // For sink task, pick its upstream as initial clock
+      assert(clocks(task4.id) == clocks(task2.id))
+
+      // For source task, set the initial clock as startClock
+      assert(clocks(task5.id) == startClock)
+    }
+
+    "maintain global checkpoint time" in {
+      val store = new Store()
+      val startClock = 100L
+      store.put(ClockService.START_CLOCK, startClock)
+      val clockService = system.actorOf(Props(new ClockService(dag, store)))
+      clockService ! UpdateClock(TaskId(0, 0), 200L)
+      expectMsgType[UpstreamMinClock]
+      clockService ! UpdateClock(TaskId(1, 0), 200L)
+      expectMsgType[UpstreamMinClock]
+
+      clockService ! GetStartClock
+      expectMsg(StartClock(200L))
+
+      val conf = UserConfig.empty.withBoolean("state.checkpoint.enable", true)
+      val task3 = ProcessorDescription(id = 3, taskClass = 
classOf[TaskActor].getName,
+        parallelism = 1, taskConf = conf)
+      val task4 = ProcessorDescription(id = 4, taskClass = 
classOf[TaskActor].getName,
+        parallelism = 1, taskConf = conf)
+      val dagWithStateTasks = DAG(Graph(
+        task1 ~ hash ~> task2,
+        task1 ~ hash ~> task3,
+        task3 ~ hash ~> task2,
+        task2 ~ hash ~> task4
+      ), version = 1)
+
+      val taskId3 = TaskId(3, 0)
+      val taskId4 = TaskId(4, 0)
+
+      clockService ! ChangeToNewDAG(dagWithStateTasks)
+      expectMsgType[ChangeToNewDAGSuccess]
+
+      clockService ! ReportCheckpointClock(taskId3, startClock)
+      clockService ! ReportCheckpointClock(taskId4, startClock)
+      clockService ! GetStartClock
+      expectMsg(StartClock(startClock))
+
+      clockService ! ReportCheckpointClock(taskId3, 200L)
+      clockService ! ReportCheckpointClock(taskId4, 300L)
+      clockService ! GetStartClock
+      expectMsg(StartClock(startClock))
+
+      clockService ! ReportCheckpointClock(taskId3, 300L)
+      clockService ! GetStartClock
+      expectMsg(StartClock(300L))
+    }
+  }
+
+  "ProcessorClock" should {
+    "maintain the min clock of current processor" in {
+      val processorId = 0
+      val parallism = 3
+      val clock = new ProcessorClock(processorId, LifeTime.Immortal, parallism)
+      clock.init(100L)
+      clock.updateMinClock(0, 101)
+      assert(clock.min == 100L)
+
+      clock.updateMinClock(1, 102)
+      assert(clock.min == 100L)
+
+      clock.updateMinClock(2, 103)
+      assert(clock.min == 101L)
+    }
+  }
+
+  "HealthChecker" should {
+    "report stalling if the clock is not advancing" in {
+      val healthChecker = new HealthChecker(stallingThresholdSeconds = 1)
+      val source = ProcessorDescription(id = 0, taskClass = null, parallelism 
= 1)
+      val sourceClock = new ProcessorClock(0, LifeTime.Immortal, 1)
+      sourceClock.init(0L)
+      val sink = ProcessorDescription(id = 1, taskClass = null, parallelism = 
1)
+      val sinkClock = new ProcessorClock(1, LifeTime.Immortal, 1)
+      sinkClock.init(0L)
+      val graph = Graph.empty[ProcessorDescription, PartitionerDescription]
+      graph.addVertex(source)
+      graph.addVertex(sink)
+      graph.addEdge(source, PartitionerDescription(null), sink)
+      val dag = DAG(graph)
+      val clocks = Map(
+        0 -> sourceClock,
+        1 -> sinkClock
+      )
+
+      sourceClock.updateMinClock(0, 100L)
+      sinkClock.updateMinClock(0, 100L)
+
+      // Clock advances from 0 to 100, there is no stalling.
+      healthChecker.check(currentMinClock = 100, clocks, dag, 200)
+      healthChecker.getReport.stallingTasks shouldBe List.empty[TaskId]
+
+      // Clock not advancing.
+      // Pasted time exceed the stalling threshold, report stalling
+      healthChecker.check(currentMinClock = 100, clocks, dag, 1300)
+
+      // The source task is stalling the clock
+      healthChecker.getReport.stallingTasks shouldBe List(TaskId(0, 0))
+
+      // Advance the source clock
+      sourceClock.updateMinClock(0, 101L)
+      healthChecker.check(currentMinClock = 100, clocks, dag, 1300)
+      // The sink task is stalling the clock
+      healthChecker.getReport.stallingTasks shouldBe List(TaskId(1, 0))
+    }
+  }
+}
+
+object ClockServiceSpec {
+
+  class Store extends AppDataStore {
+
+    private var map = Map.empty[String, Any]
+
+    def put(key: String, value: Any): Future[Any] = {
+      map = map + (key -> value)
+      Promise.successful(value).future
+    }
+
+    def get(key: String): Future[Any] = {
+      Promise.successful(map.get(key).getOrElse(null)).future
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
new file mode 100644
index 0000000..258c7ff
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, 
DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, 
NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange}
+import org.apache.gearpump.streaming.task.{Subscriber, TaskActor}
+import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, 
StreamApplication}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.Graph._
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll 
{
+
+  val hash = Partitioner[HashPartitioner]
+  val task1 = ProcessorDescription(id = 1, taskClass = 
classOf[TaskActor].getName, parallelism = 1)
+  val task2 = ProcessorDescription(id = 2, taskClass = 
classOf[TaskActor].getName, parallelism = 1)
+  val graph = Graph(task1 ~ hash ~> task2)
+  val dag = DAG(graph)
+  implicit var system: ActorSystem = null
+  val appId = 0
+  lazy val userConfig = UserConfig.empty.withValue(StreamApplication.DAG, 
graph)
+
+  "DagManager" should {
+    import org.apache.gearpump.streaming.appmaster.ClockServiceSpec.Store
+    "maintain the dags properly" in {
+      val store = new Store
+
+      val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, 
store, Some(dag))))
+      val client = TestProbe()
+      client.send(dagManager, GetLatestDAG)
+      client.expectMsg(LatestDAG(dag))
+
+      client.send(dagManager, GetTaskLaunchData(dag.version, task1.id, null))
+      val task1LaunchData = TaskLaunchData(task1, Subscriber.of(task1.id, dag))
+      client.expectMsg(task1LaunchData)
+
+      val task2LaunchData = TaskLaunchData(task2, Subscriber.of(task2.id, dag))
+      client.send(dagManager, GetTaskLaunchData(dag.version, task2.id, null))
+      client.expectMsg(task2LaunchData)
+
+      val watcher = TestProbe()
+      client.send(dagManager, WatchChange(watcher.ref))
+      val task3 = task2.copy(id = 3, life = LifeTime(100, Long.MaxValue))
+
+      client.send(dagManager, ReplaceProcessor(task2.id, task3))
+      client.expectMsg(DAGOperationSuccess)
+
+      client.send(dagManager, GetLatestDAG)
+      val newDag = client.expectMsgPF() {
+        case LatestDAG(dag) => dag
+      }
+      assert(newDag.processors.contains(task3.id))
+      watcher.expectMsgType[LatestDAG]
+
+      val task4 = task3.copy(id = 4)
+      client.send(dagManager, ReplaceProcessor(task3.id, task4))
+      client.expectMsgType[DAGOperationFailed]
+
+      client.send(dagManager, NewDAGDeployed(newDag.version))
+      client.send(dagManager, ReplaceProcessor(task3.id, task4))
+      client.expectMsg(DAGOperationSuccess)
+    }
+
+    "retrieve last stored dag properly" in {
+      val store = new Store
+      val newGraph = Graph(task1 ~ hash ~> task2 ~> task2)
+      val newDag = DAG(newGraph)
+      store.put(StreamApplication.DAG, newDag)
+      val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, 
store, Some(dag))))
+      val client = TestProbe()
+      client.send(dagManager, GetLatestDAG)
+      client.expectMsg(LatestDAG(newDag))
+    }
+  }
+
+  override def afterAll {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  override def beforeAll {
+    this.system = ActorSystem("DagManagerSpec", TestUtil.DEFAULT_CONFIG)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
new file mode 100644
index 0000000..42b2618
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import akka.actor._
+import akka.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
+import org.apache.gearpump.TestProbeUtil
+import org.apache.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
+import org.apache.gearpump.cluster._
+import 
org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemStarted,
 StartExecutorSystemTimeout, StartExecutorSystems}
+import org.apache.gearpump.cluster.appmaster.{ExecutorSystem, WorkerInfo}
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.jarstore.FilePath
+import org.apache.gearpump.streaming.ExecutorId
+import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor
+import 
org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, _}
+import 
org.apache.gearpump.streaming.appmaster.ExecutorManagerSpec.StartExecutorActorPlease
+import org.apache.gearpump.util.ActorSystemBooter.BindLifeCycle
+import org.apache.gearpump.util.LogUtil
+import org.scalatest._
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class ExecutorManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
+  implicit var system: ActorSystem = null
+
+  private val LOG = LogUtil.getLogger(getClass)
+  private val appId = 0
+  private val resource = Resource(10)
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  private def startExecutorSystems = {
+    val master = TestProbe()
+    val taskManager = TestProbe()
+    val executor = TestProbe()
+    val userConfig = UserConfig.empty
+
+    val username = "user"
+    val appName = "app"
+    val appJar = Some(AppJar("for_test", FilePath("path")))
+
+    val appMasterContext = AppMasterContext(appId, username, null, null, 
appJar, master.ref, null)
+
+    val executorFactory = (_: ExecutorContext, _: UserConfig, _: Address, _: 
ExecutorId) => {
+      executor.ref ! StartExecutorActorPlease
+      TestProbeUtil.toProps(executor)
+    }
+    val executorManager = system.actorOf(Props(new ExecutorManager(userConfig, 
appMasterContext,
+      executorFactory, ConfigFactory.empty, appName)))
+
+    taskManager.send(executorManager, SetTaskManager(taskManager.ref))
+    val resourceRequest = Array(ResourceRequest(resource, 
WorkerId.unspecified))
+
+    // Starts executors
+    taskManager.send(executorManager, StartExecutors(resourceRequest, 
appJar.get))
+
+    // Asks master to start executor systems
+    import scala.concurrent.duration._
+    val startExecutorSystem = 
master.receiveOne(5.seconds).asInstanceOf[StartExecutorSystems]
+    assert(startExecutorSystem.resources == resourceRequest)
+    import startExecutorSystem.executorSystemConfig.{classPath, 
executorAkkaConfig, jar, jvmArguments, username => returnedUserName}
+    assert(startExecutorSystem.resources == resourceRequest)
+
+    assert(classPath.length == 0)
+    assert(jvmArguments.length == 0)
+    assert(jar == appJar)
+    assert(returnedUserName == username)
+    assert(executorAkkaConfig.isEmpty)
+
+    (master, executor, taskManager, executorManager)
+  }
+
+  it should "report timeout to taskManager" in {
+    import org.apache.gearpump.streaming.appmaster.ExecutorManager._
+    val (master, executor, taskManager, executorManager) = startExecutorSystems
+    master.reply(StartExecutorSystemTimeout)
+    taskManager.expectMsg(StartExecutorsTimeOut)
+  }
+
+  it should "start executor actor correctly" in {
+    val (master, executor, taskManager, executorManager) = startExecutorSystems
+    val executorSystemDaemon = TestProbe()
+    val worker = TestProbe()
+    val workerId = WorkerId(0, 0L)
+    val workerInfo = WorkerInfo(workerId, worker.ref)
+    val executorSystem = ExecutorSystem(0, null, executorSystemDaemon.ref,
+      resource, workerInfo)
+    master.reply(ExecutorSystemStarted(executorSystem, None))
+    import scala.concurrent.duration._
+    val bindLifeWith = 
executorSystemDaemon.receiveOne(3.seconds).asInstanceOf[BindLifeCycle]
+    val proxyExecutor = bindLifeWith.actor
+    executor.expectMsg(StartExecutorActorPlease)
+
+    val executorId = 0
+
+    // Registers executor
+    executor.send(executorManager, RegisterExecutor(proxyExecutor, executorId,
+      resource, workerInfo))
+    taskManager.expectMsgType[ExecutorStarted]
+
+    // Broadcasts message to childs
+    taskManager.send(executorManager, BroadCast("broadcast"))
+    executor.expectMsg("broadcast")
+
+    // Unicast
+    taskManager.send(executorManager, UniCast(executorId, "unicast"))
+    executor.expectMsg("unicast")
+
+    // Updates executor resource status
+    val usedResource = Resource(5)
+    executorManager ! ExecutorResourceUsageSummary(Map(executorId -> 
usedResource))
+    worker.expectMsg(ChangeExecutorResource(appId, executorId, resource - 
usedResource))
+
+    // Watches for executor termination
+    system.stop(executor.ref)
+    LOG.info("Shutting down executor, and wait taskManager to get notified")
+    taskManager.expectMsg(ExecutorStopped(executorId))
+  }
+}
+
+object ExecutorManagerSpec {
+  case object StartExecutorActorPlease
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
new file mode 100644
index 0000000..5f3905f
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import org.apache.gearpump.streaming.executor.ExecutorRestartPolicy
+import org.apache.gearpump.streaming.task.TaskId
+import org.scalatest.{Matchers, WordSpec}
+
+import scala.concurrent.duration._
+
+class ExecutorRestartPolicySpec extends WordSpec with Matchers {
+
+  "ExecutorRestartPolicy" should {
+    "decide whether to restart the executor" in {
+      val executorId1 = 1
+      val executorId2 = 2
+      val taskId = TaskId(0, 0)
+      val executorSupervisor = new ExecutorRestartPolicy(
+        maxNrOfRetries = 3, withinTimeRange = 1.seconds)
+      executorSupervisor.addTaskToExecutor(executorId1, taskId)
+      assert(executorSupervisor.allowRestartExecutor(executorId1))
+      assert(executorSupervisor.allowRestartExecutor(executorId1))
+      executorSupervisor.addTaskToExecutor(executorId2, taskId)
+      assert(executorSupervisor.allowRestartExecutor(executorId2))
+      assert(!executorSupervisor.allowRestartExecutor(executorId2))
+      Thread.sleep(1000)
+      assert(executorSupervisor.allowRestartExecutor(executorId2))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
new file mode 100644
index 0000000..5157284
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import org.apache.gearpump.cluster.ClientToMaster.QueryHistoryMetrics
+import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.metrics.Metrics.{Counter, Histogram, Meter}
+import org.apache.gearpump.util.HistoryMetricsService
+import org.apache.gearpump.util.HistoryMetricsService._
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+
+class HistoryMetricsServiceSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
+
+  val count = 2
+  val intervalMs = 10
+
+  val config = HistoryMetricsConfig(
+    retainHistoryDataHours = 72,
+    retainHistoryDataIntervalMs = 3600 * 1000,
+    retainRecentDataSeconds = 300,
+    retainRecentDataIntervalMs = 15 * 1000)
+
+  "SingleValueMetricsStore" should "retain metrics and expire old value" in {
+
+    val store = new SingleValueMetricsStore(count, intervalMs)
+
+    var now = 0L
+    // Only 1 data point will be kept in @intervalMs
+    store.add(Counter("count", 1), now)
+    store.add(Counter("count", 2), now)
+
+    now = now + intervalMs + 1
+
+    // Only 1 data point will be kept in @intervalMs
+    store.add(Counter("count", 3), now)
+    store.add(Counter("count", 4), now)
+
+    now = now + intervalMs + 1
+
+    // Only 1 data point will be kept in @intervalMs
+    // expire oldest data point, because we only keep @count records
+    store.add(Counter("count", 5), now)
+    store.add(Counter("count", 6), now)
+
+    val result = store.read
+    assert(result.size == count)
+
+    // The oldest value is expired
+    assert(result.head.value.asInstanceOf[Counter].value == 3L)
+
+    // The newest value is inserted
+    assert(result.last.value.asInstanceOf[Counter].value == 5L)
+  }
+
+  val meterTemplate = Meter("meter", 0, 0, 0, "s")
+
+  "HistogramMetricsStore" should "retain corse-grain history and fine-grain 
recent data" in {
+    val store = new HistogramMetricsStore(config)
+    val a = Histogram(null, 100, 0, 0, 0, 0, 0)
+    val b = Histogram(null, 200, 0, 0, 0, 0, 0)
+    val c = Histogram(null, 50, 0, 0, 0, 0, 0)
+
+    store.add(a)
+    store.add(b)
+    store.add(c)
+
+    assert(store.readLatest.map(_.value) == List(c))
+    assert(store.readRecent.map(_.value) == List(a))
+    assert(store.readHistory.map(_.value) == List(a))
+  }
+
+  "MeterMetricsStore" should "retain corse-grain history and fine-grain recent 
data" in {
+    val store = new MeterMetricsStore(config)
+
+    val a = Meter(null, 1, 100, 0, null)
+    val b = Meter(null, 1, 200, 0, null)
+    val c = Meter(null, 1, 50, 0, null)
+
+    store.add(a)
+    store.add(b)
+    store.add(c)
+
+    assert(store.readLatest.map(_.value) == List(c))
+    assert(store.readRecent.map(_.value) == List(a))
+    assert(store.readHistory.map(_.value) == List(a))
+  }
+
+  "CounterMetricsStore" should "retain corse-grain history and fine-grain 
recent data" in {
+    val store = new CounterMetricsStore(config)
+    val a = Counter(null, 50)
+    val b = Counter(null, 100)
+    val c = Counter(null, 150)
+
+    store.add(a)
+    store.add(b)
+    store.add(c)
+
+    assert(store.readLatest.map(_.value) == List(c))
+    assert(store.readRecent.map(_.value) == List(a))
+    assert(store.readHistory.map(_.value) == List(a))
+  }
+
+  "HistoryMetricsService" should
+    "retain lastest metrics data and allow user to query metrics by path" in {
+    implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+    val appId = 0
+    val service = system.actorOf(Props(new HistoryMetricsService("app0", 
config)))
+    service ! Counter("metric.counter", 0)
+    service ! Meter("metric.meter", 0, 0, 0, null)
+    service ! Histogram("metric.histogram", 0, 0, 0, 0, 0, 0)
+
+    val client = TestProbe()
+
+    // Filters metrics with path "metric.counter"
+    client.send(service, QueryHistoryMetrics("metric.counter"))
+    import scala.concurrent.duration._
+    client.expectMsgPF(3.seconds) {
+      case history: HistoryMetrics =>
+        assert(history.path == "metric.counter")
+        val metricList = history.metrics
+        metricList.foreach(metricItem =>
+          assert(metricItem.value.isInstanceOf[Counter])
+        )
+    }
+
+    // Filters metrics with path "metric.meter"
+    client.send(service, QueryHistoryMetrics("metric.meter"))
+    client.expectMsgPF(3.seconds) {
+      case history: HistoryMetrics =>
+        assert(history.path == "metric.meter")
+        val metricList = history.metrics
+        metricList.foreach(metricItem =>
+          assert(metricItem.value.isInstanceOf[Meter])
+        )
+    }
+
+    // Filters metrics with path "metric.histogram"
+    client.send(service, QueryHistoryMetrics("metric.histogram"))
+    client.expectMsgPF(3.seconds) {
+      case history: HistoryMetrics =>
+        assert(history.path == "metric.histogram")
+        val metricList = history.metrics
+        metricList.foreach(metricItem =>
+          assert(metricItem.value.isInstanceOf[Histogram])
+        )
+    }
+
+    // Filters metrics with path prefix "metric", all metrics which can
+    // match the path prefix will be retained.
+    client.send(service, QueryHistoryMetrics("metric"))
+    client.expectMsgPF(3.seconds) {
+      case history: HistoryMetrics =>
+        val metricList = history.metrics
+
+        var counterFound = false
+        var meterFound = false
+        var histogramFound = false
+
+        metricList.foreach(metricItem =>
+          metricItem.value match {
+            case v: Counter => counterFound = true
+            case v: Meter => meterFound = true
+            case v: Histogram => histogramFound = true
+            case _ => // Skip
+          }
+        )
+
+        // All kinds of metric type are reserved.
+        assert(counterFound && meterFound && histogramFound)
+    }
+
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
new file mode 100644
index 0000000..5f6dd04
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.appmaster
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.cluster.{AppJar, TestUtil}
+import org.apache.gearpump.jarstore.FilePath
+import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, 
TestTask2}
+import org.apache.gearpump.streaming.task.TaskId
+import org.apache.gearpump.streaming.{DAG, ProcessorDescription, _}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.Graph._
+import org.scalatest.{Matchers, WordSpec}
+
+import scala.concurrent.{Await, Future}
+
+class JarSchedulerSpec extends WordSpec with Matchers {
+  val mockJar1 = AppJar("jar1", FilePath("path"))
+  val mockJar2 = AppJar("jar2", FilePath("path"))
+  val task1 = ProcessorDescription(id = 0, taskClass = 
classOf[TestTask1].getName, parallelism = 1,
+    jar = mockJar1)
+  val task2 = ProcessorDescription(id = 1, taskClass = 
classOf[TestTask2].getName, parallelism = 1,
+    jar = mockJar1)
+  val task3 = ProcessorDescription(id = 2, taskClass = 
classOf[TestTask2].getName, parallelism = 2,
+    jar = mockJar2)
+  val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
+
+  import scala.concurrent.duration._
+
+  "JarScheduler" should {
+    "schedule tasks depends on app jar" in {
+      val system = ActorSystem("JarSchedulerSpec")
+      implicit val dispatcher = system.dispatcher
+      val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system)
+      manager.setDag(dag, Future {
+        0L
+      })
+      val requests = Array(ResourceRequest(Resource(2), WorkerId.unspecified))
+      val result = Await.result(manager.getResourceRequestDetails(), 
15.seconds)
+      assert(result.length == 1)
+      assert(result.head.jar == mockJar1)
+      assert(result.head.requests.deep == requests.deep)
+
+      val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 
0,
+        Resource(2)), 15.seconds)
+      assert(tasks.contains(TaskId(0, 0)))
+      assert(tasks.contains(TaskId(1, 0)))
+
+      val newDag = replaceDAG(dag, 1, task3, 1)
+
+      manager.setDag(newDag, Future {
+        0
+      })
+      val requestDetails = Await.result(manager.getResourceRequestDetails().
+        map(_.sortBy(_.jar.name)), 15.seconds)
+      assert(requestDetails.length == 2)
+      assert(requestDetails.last.jar == mockJar2)
+      assert(requestDetails.last.requests.deep == requests.deep)
+
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
+    }
+  }
+
+  def replaceDAG(
+      dag: DAG, oldProcessorId: ProcessorId, newProcessor: 
ProcessorDescription, newVersion: Int)
+    : DAG = {
+    val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth,
+      newProcessor.life.birth)
+    val newProcessorMap = dag.processors ++
+      Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = 
oldProcessorLife),
+        newProcessor.id -> newProcessor)
+    val newGraph = dag.graph.subGraph(oldProcessorId).
+      replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph)
+    new DAG(newVersion, newProcessorMap, newGraph)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskLocatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskLocatorSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskLocatorSpec.scala
new file mode 100644
index 0000000..4341041
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskLocatorSpec.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.streaming.appmaster.TaskLocator.Localities
+import org.apache.gearpump.streaming.task.TaskId
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+class TaskLocatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+  it should "serialize/deserialize correctly" in {
+    val localities = new Localities(Map(WorkerId(0, 0L) -> Array(TaskId(0, 1), 
TaskId(1, 2))))
+    Localities.toJson(localities)
+
+    localities.localities.mapValues(_.toList) shouldBe
+      
Localities.fromJson(Localities.toJson(localities)).localities.mapValues(_.toList)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
new file mode 100644
index 0000000..9765278
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.testkit.TestProbe
+import 
org.apache.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.cluster.{AppJar, TestUtil, UserConfig}
+import org.apache.gearpump.jarstore.FilePath
+import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, 
PartitionerDescription}
+import org.apache.gearpump.streaming.AppMasterToExecutor.{LaunchTasks, 
StartAllTasks, StartDynamicDag, TaskLocationsReady, TaskLocationsReceived, 
TaskRegistered}
+import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterTask
+import 
org.apache.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut
+import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, 
ChangeToNewDAGSuccess}
+import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, 
GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange}
+import 
org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorResourceUsageSummary,
 SetTaskManager, StartExecutors, _}
+import 
org.apache.gearpump.streaming.appmaster.JarScheduler.ResourceRequestDetail
+import org.apache.gearpump.streaming.appmaster.TaskManagerSpec.{Env, Task1, 
Task2}
+import org.apache.gearpump.streaming.executor.Executor.RestartTasks
+import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _}
+import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, 
ProcessorId}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.Graph._
+import org.apache.gearpump.{Message, TimeStamp}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
+
+  implicit var system: ActorSystem = null
+
+  val task1Class = classOf[Task1].getName
+  val task2Class = classOf[Task2].getName
+
+  val mockJar = AppJar("jar_for_test", FilePath("path"))
+  val task1 = ProcessorDescription(id = 0, taskClass = task1Class, parallelism 
= 1, jar = mockJar)
+  val task2 = ProcessorDescription(id = 1, taskClass = task2Class, parallelism 
= 1, jar = mockJar)
+
+  val dag: DAG = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
+  val dagVersion = 0
+
+  val task1LaunchData = TaskLaunchData(task1, Subscriber.of(processorId = 0, 
dag))
+  val task2LaunchData = TaskLaunchData(task2, Subscriber.of(processorId = 1, 
dag))
+
+  val appId = 0
+
+  val resource = Resource(2)
+  val workerId = WorkerId(0, 0L)
+  val executorId = 0
+
+  override def beforeEach(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterEach(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  it should "recover by requesting new executors when executor stopped 
unexpectedly" in {
+    val env = bootUp
+    import env._
+    implicit val dispatcher = system.dispatcher
+
+    val resourceRequest = Array(ResourceRequest(resource, workerId))
+    when(scheduler.executorFailed(executorId)).thenReturn(Future {
+      Some(ResourceRequestDetail(mockJar,
+        resourceRequest))
+    })
+
+    taskManager ! ExecutorStopped(executorId)
+
+    // When one executor stop, it will also trigger the recovery by restart
+    // existing executors
+    executorManager.expectMsg(BroadCast(RestartTasks(dagVersion)))
+
+    // Asks for new executors
+    val returned = 
executorManager.receiveN(1).head.asInstanceOf[StartExecutors]
+    assert(returned.resources.deep == resourceRequest.deep)
+    executorManager.reply(StartExecutorsTimeOut)
+
+    // TaskManager cannot handle the TimeOut error itself, escalate to 
appmaster.
+    appMaster.expectMsg(AllocateResourceTimeOut)
+  }
+
+  it should "recover by restarting existing executors when message loss 
happen" in {
+    val env = bootUp
+    import env._
+
+    taskManager ! ReplayFromTimestampWindowTrailingEdge(appId)
+
+    // Restart the executors so that we can replay from minClock
+    executorManager.expectMsg(BroadCast(RestartTasks(dagVersion)))
+  }
+
+  import org.apache.gearpump.streaming.appmaster.TaskManager.TaskChangeRegistry
+  "TaskChangeRegistry" should "track all modified task registration" in {
+    val tasks = List(TaskId(0, 0), TaskId(0, 1))
+    val registry = new TaskChangeRegistry(tasks)
+    registry.taskChanged(TaskId(0, 0))
+    registry.taskChanged(TaskId(0, 1))
+    assert(registry.allTaskChanged)
+  }
+
+  "DAGDiff" should "track all the DAG migration impact" in {
+    val defaultEdge = PartitionerDescription(null)
+    val a = ProcessorDescription(id = 1, taskClass = null, parallelism = 1)
+    val b = ProcessorDescription(id = 2, taskClass = null, parallelism = 1)
+    val c = ProcessorDescription(id = 3, taskClass = null, parallelism = 1)
+    val left = Graph(a ~ defaultEdge ~> b, a ~ defaultEdge ~> c)
+
+    val d = ProcessorDescription(id = 4, taskClass = null, parallelism = 1)
+    val right = left.copy
+    right.addVertex(d)
+    right.addEdge(c, defaultEdge, d)
+    val e = a.copy(life = LifeTime(0, 0))
+    right.replaceVertex(a, e)
+
+    val diff = TaskManager.migrate(DAG(left), DAG(right, version = 1))
+    diff.addedProcessors shouldBe List(d.id)
+
+    diff.modifiedProcessors shouldBe List(a.id)
+
+    diff.impactedUpstream shouldBe List(c.id)
+  }
+
+  private def bootUp: Env = {
+
+    implicit val dispatcher = system.dispatcher
+
+    val executorManager = TestProbe()
+    val clockService = TestProbe()
+    val appMaster = TestProbe()
+    val executor = TestProbe()
+
+    val scheduler = mock(classOf[JarScheduler])
+
+    val dagManager = TestProbe()
+
+    val taskManager = system.actorOf(
+      Props(new TaskManager(appId, dagManager.ref, scheduler, 
executorManager.ref, clockService.ref,
+        appMaster.ref, "appName")))
+
+    dagManager.expectMsgType[WatchChange]
+    executorManager.expectMsgType[SetTaskManager]
+
+    // Step1: first transition from Unitialized to ApplicationReady
+    executorManager.expectMsgType[ExecutorResourceUsageSummary]
+    dagManager.expectMsgType[NewDAGDeployed]
+
+    // Step2: Get Additional Resource Request
+    when(scheduler.getResourceRequestDetails())
+      .thenReturn(Future {
+        Array(ResourceRequestDetail(mockJar, Array(ResourceRequest(resource,
+          WorkerId.unspecified))))
+      })
+
+    // Step3: DAG changed. Start transit from ApplicationReady -> DynamicDAG
+    dagManager.expectMsg(GetLatestDAG)
+    dagManager.reply(LatestDAG(dag))
+
+    // Step4: Start remote Executors.
+    // received Broadcast
+    executorManager.expectMsg(BroadCast(StartDynamicDag(dag.version)))
+    executorManager.expectMsgType[StartExecutors]
+
+    when(scheduler.scheduleTask(mockJar, workerId, executorId, resource))
+      .thenReturn(Future(List(TaskId(0, 0), TaskId(1, 0))))
+
+    // Step5: Executor is started.
+    executorManager.reply(ExecutorStarted(executorId, resource, workerId, 
Some(mockJar)))
+
+    // Step6: Prepare to start Task. First GetTaskLaunchData.
+    val taskLaunchData: PartialFunction[Any, TaskLaunchData] = {
+      case GetTaskLaunchData(_, 0, executorStarted) =>
+        task1LaunchData.copy(context = executorStarted)
+      case GetTaskLaunchData(_, 1, executorStarted) =>
+        task2LaunchData.copy(context = executorStarted)
+    }
+
+    val launchData1 = dagManager.expectMsgPF()(taskLaunchData)
+    dagManager.reply(launchData1)
+
+    val launchData2 = dagManager.expectMsgPF()(taskLaunchData)
+    dagManager.reply(launchData2)
+
+    // Step7: Launch Task
+    val launchTaskMatch: PartialFunction[Any, RegisterTask] = {
+      case UniCast(executorId, launch: LaunchTasks) =>
+        RegisterTask(launch.taskId.head, executorId, 
HostPort("127.0.0.1:3000"))
+    }
+
+    // Taskmanager should return the latest start clock to task(0,0)
+    clockService.expectMsg(GetStartClock)
+    clockService.reply(StartClock(0))
+
+    // Step8: Task is started. registerTask.
+    val registerTask1 = executorManager.expectMsgPF()(launchTaskMatch)
+    taskManager.tell(registerTask1, executor.ref)
+    executor.expectMsgType[TaskRegistered]
+
+    val registerTask2 = executorManager.expectMsgPF()(launchTaskMatch)
+    taskManager.tell(registerTask2, executor.ref)
+    executor.expectMsgType[TaskRegistered]
+
+    // Step9: start broadcasting TaskLocations.
+    import scala.concurrent.duration._
+    assert(executorManager.expectMsgPF(5.seconds) {
+      case BroadCast(startAllTasks) => 
startAllTasks.isInstanceOf[TaskLocationsReady]
+    })
+
+    // Step10: Executor confirm it has received TaskLocationsReceived(version, 
executorId)
+    taskManager.tell(TaskLocationsReceived(dag.version, executorId), 
executor.ref)
+
+    // Step11: Tell ClockService to update DAG.
+    clockService.expectMsgType[ChangeToNewDAG]
+    clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, 
TimeStamp]))
+
+    // Step12: start all tasks
+    import scala.concurrent.duration._
+    assert(executorManager.expectMsgPF(5.seconds) {
+      case BroadCast(startAllTasks) => 
startAllTasks.isInstanceOf[StartAllTasks]
+    })
+
+    // Step13, Tell executor Manager the updated usage status of executors.
+    executorManager.expectMsgType[ExecutorResourceUsageSummary]
+
+    // Step14: transition from DynamicDAG to ApplicationReady
+    Env(executorManager, clockService, appMaster, executor, taskManager, 
scheduler)
+  }
+}
+
+object TaskManagerSpec {
+  case class Env(
+      executorManager: TestProbe,
+      clockService: TestProbe,
+      appMaster: TestProbe,
+      executor: TestProbe,
+      taskManager: ActorRef,
+      scheduler: JarScheduler)
+
+  class Task1(taskContext: TaskContext, userConf: UserConfig)
+    extends Task(taskContext, userConf) {
+    override def onStart(startTime: StartTime): Unit = {}
+    override def onNext(msg: Message): Unit = {}
+  }
+
+  class Task2(taskContext: TaskContext, userConf: UserConfig)
+    extends Task(taskContext, userConf) {
+    override def onStart(startTime: StartTime): Unit = {}
+    override def onNext(msg: Message): Unit = {}
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskRegistrySpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskRegistrySpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskRegistrySpec.scala
new file mode 100644
index 0000000..6f85834
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskRegistrySpec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.appmaster
+
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.streaming.appmaster.TaskRegistry.{Accept, Reject, 
TaskLocation, TaskLocations}
+import org.apache.gearpump.streaming.task.TaskId
+import org.apache.gearpump.transport.HostPort
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+class TaskRegistrySpec extends FlatSpec with Matchers with BeforeAndAfterEach {
+
+  it should "maintain registered tasks" in {
+    val task0 = TaskId(0, 0)
+    val task1 = TaskId(0, 1)
+    val task2 = TaskId(0, 2)
+
+    val register = new TaskRegistry(expectedTasks = List(task0, task1, task2))
+    val host1 = HostPort("127.0.0.1:3000")
+    val host2 = HostPort("127.0.0.1:3001")
+
+    val executorId = 0
+    assert(Accept == register.registerTask(task0, TaskLocation(executorId, 
host1)))
+    assert(Accept == register.registerTask(task1, TaskLocation(executorId, 
host1)))
+    assert(Accept == register.registerTask(task2, TaskLocation(executorId, 
host2)))
+
+    assert(Reject == register.registerTask(TaskId(100, 0), 
TaskLocation(executorId, host2)))
+
+    assert(register.isAllTasksRegistered)
+    val TaskLocations(taskLocations) = register.getTaskLocations
+    val tasksOnHost1 = taskLocations.get(host1).get
+    val tasksOnHost2 = taskLocations.get(host2).get
+    assert(tasksOnHost1.contains(task0))
+    assert(tasksOnHost1.contains(task1))
+    assert(tasksOnHost2.contains(task2))
+
+    assert(register.getExecutorId(task0) == Some(executorId))
+    assert(register.isTaskRegisteredForExecutor(executorId))
+
+    register.processorExecutors(0) shouldBe Map(
+      executorId -> List(task0, task1, task2)
+    )
+
+    register.usedResource.resources shouldBe Map(
+      executorId -> Resource(3)
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
new file mode 100644
index 0000000..4a532dd
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.appmaster
+
+import com.typesafe.config.ConfigFactory
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, 
ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.appmaster.TaskLocator.Localities
+import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, 
TestTask2}
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, 
TaskId}
+import org.apache.gearpump.streaming.{DAG, ProcessorDescription}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.Graph._
+import org.scalatest.{Matchers, WordSpec}
+
+import scala.collection.mutable.ArrayBuffer
+
+class TaskSchedulerSpec extends WordSpec with Matchers {
+  val task1 = ProcessorDescription(id = 0, taskClass = 
classOf[TestTask1].getName, parallelism = 4)
+  val task2 = ProcessorDescription(id = 1, taskClass = 
classOf[TestTask2].getName, parallelism = 2)
+
+  val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
+
+  val config = TestUtil.DEFAULT_CONFIG
+
+  "TaskScheduler" should {
+    "schedule tasks on different workers properly according user's 
configuration" in {
+
+      val localities = Localities(
+        Map(WorkerId(1, 0L) -> Array(TaskId(0, 0), TaskId(0, 1), TaskId(1, 0), 
TaskId(1, 1)),
+          WorkerId(2, 0L) -> Array(TaskId(0, 2), TaskId(0, 3))
+        ))
+
+      val localityConfig = 
ConfigFactory.parseString(Localities.toJson(localities))
+
+      import 
org.apache.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES
+      val appName = "app"
+      val taskScheduler = new TaskSchedulerImpl(appId = 0, appName,
+        config.withValue(s"$GEARPUMP_STREAMING_LOCALITIES.$appName", 
localityConfig.root))
+
+      val expectedRequests =
+        Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = 
Relaxation.SPECIFICWORKER),
+          ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = 
Relaxation.SPECIFICWORKER))
+
+      taskScheduler.setDAG(dag)
+      val resourceRequests = taskScheduler.getResourceRequests()
+
+      val acturalRequests = resourceRequests.sortBy(_.resource.slots)
+      
assert(acturalRequests.sameElements(expectedRequests.sortBy(_.resource.slots)))
+
+      val tasksOnWorker1 = ArrayBuffer[Int]()
+      val tasksOnWorker2 = ArrayBuffer[Int]()
+      for (i <- 0 until 4) {
+        tasksOnWorker1.append(taskScheduler.schedule(WorkerId(1, 0L),
+          executorId = 0, Resource(1)).head.processorId)
+      }
+      for (i <- 0 until 2) {
+        tasksOnWorker2.append(taskScheduler.schedule(WorkerId(2, 0L), 
executorId = 1,
+          Resource(1)).head.processorId)
+      }
+
+      // Allocates more resource, and no tasks to launch
+      assert(taskScheduler.schedule(WorkerId(3, 0L), executorId = 3,
+        Resource(1)) == List.empty[TaskId])
+
+      // On worker1, executor 0
+      assert(tasksOnWorker1.sorted.sameElements(Array(0, 0, 1, 1)))
+
+      // On worker2, executor 1, Task(0, 0), Task(0, 1)
+      assert(tasksOnWorker2.sorted.sameElements(Array(0, 0)))
+
+      val rescheduledResources = taskScheduler.executorFailed(executorId = 1)
+
+      
assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2),
+        WorkerId.unspecified, relaxation = Relaxation.ONEWORKER))))
+
+      val launchedTask = taskScheduler.schedule(WorkerId(3, 0L), executorId = 
3, Resource(2))
+
+      // Starts the failed 2 tasks Task(0, 0) and Task(0, 1)
+      assert(launchedTask.length == 2)
+    }
+
+    "schedule task fairly" in {
+      val appName = "app"
+      val taskScheduler = new TaskSchedulerImpl(appId = 0, appName, config)
+
+      val expectedRequests =
+        Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = 
Relaxation.SPECIFICWORKER),
+          ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = 
Relaxation.SPECIFICWORKER))
+
+      taskScheduler.setDAG(dag)
+      val tasks = taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, 
Resource(4))
+      assert(tasks.filter(_.processorId == 0).length == 2)
+      assert(tasks.filter(_.processorId == 1).length == 2)
+    }
+  }
+}
+
+object TaskSchedulerSpec {
+  class TestTask1(taskContext: TaskContext, userConf: UserConfig)
+    extends Task(taskContext, userConf) {
+    override def onStart(startTime: StartTime): Unit = Unit
+    override def onNext(msg: Message): Unit = Unit
+  }
+
+  class TestTask2(taskContext: TaskContext, userConf: UserConfig)
+    extends Task(taskContext, userConf) {
+    override def onStart(startTime: StartTime): Unit = Unit
+    override def onNext(msg: Message): Unit = Unit
+  }
+}

Reply via email to