Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 757dd367e -> ae56903da


fix GEARPUMP-61, improve code coverage

Author: manuzhang <[email protected]>

Closes #43 from manuzhang/GEARPUMP-61.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ae56903d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ae56903d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ae56903d

Branch: refs/heads/master
Commit: ae56903da7d3ae352eb919ace323e0efabd34128
Parents: 757dd36
Author: manuzhang <[email protected]>
Authored: Wed Jun 22 16:23:55 2016 -0700
Committer: Kam Kasravi <[email protected]>
Committed: Wed Jun 22 16:23:55 2016 -0700

----------------------------------------------------------------------
 .../org/apache/gearpump/util/ActorUtil.scala    |   3 +-
 .../gearpump/util/HistoryMetricsService.scala   |   3 -
 .../apache/gearpump/cluster/MasterHarness.scala |   2 +-
 .../state/processor/CountProcessorSpec.scala    |   6 +-
 .../processor/WindowAverageProcessorSpec.scala  |   6 +-
 .../processor/StormBoltOutputCollector.scala    |   4 +-
 .../apache/gearpump/streaming/Constants.scala   |   3 +
 .../streaming/appmaster/AppMaster.scala         | 123 +++++-----
 .../streaming/appmaster/ClockService.scala      |  33 ++-
 .../streaming/appmaster/TaskManager.scala       |  11 +-
 .../gearpump/streaming/executor/Executor.scala  |   8 +-
 .../executor/ExecutorRestartPolicy.scala        |  19 +-
 .../streaming/state/api/PersistentTask.scala    |   4 +-
 .../streaming/task/TaskControlMessage.scala     |  10 +-
 .../streaming/appmaster/AppMasterSpec.scala     | 229 +++++++++++++------
 .../streaming/appmaster/ClockServiceSpec.scala  |  12 +-
 .../streaming/appmaster/TaskManagerSpec.scala   |  14 +-
 17 files changed, 284 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala 
b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
index 749cdf2..09f2969 100644
--- a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
@@ -54,11 +54,10 @@ object ActorUtil {
 
   def defaultMsgHandler(actor: ActorRef): Receive = {
     case msg: Any =>
-      LOG.error(s"Cannot find a matching message, ${msg.getClass.toString}, 
forwarded from $actor")
+      LOG.error(s"Cannot find a matching message, $msg, forwarded from $actor")
   }
 
   def printActorSystemTree(system: ActorSystem): Unit = {
-    val extendedSystem = system.asInstanceOf[ExtendedActorSystem]
     val clazz = system.getClass
     val m = clazz.getDeclaredMethod("printTree")
     m.setAccessible(true)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala 
b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
index 549c34f..ee59678 100644
--- a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
@@ -358,9 +358,6 @@ object HistoryMetricsService {
 
   class GaugeMetricsStore(config: HistoryMetricsConfig) extends 
HistoryMetricsStore {
 
-    private val compartor = (left: HistoryMetricsItem, right: 
HistoryMetricsItem) =>
-      left.value.asInstanceOf[Gauge].value > 
right.value.asInstanceOf[Gauge].value
-
     private val history = new SingleValueMetricsStore(
       config.retainHistoryDataHours * 3600 * 1000 / 
config.retainHistoryDataIntervalMs,
       config.retainHistoryDataIntervalMs)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala 
b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
index f2f374e..1ec5660 100644
--- a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
+++ b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
@@ -43,7 +43,7 @@ trait MasterHarness {
   private var systemAddress: Address = null
   private var host: String = null
   private var port: Int = 0
-  private var masterProperties = new Properties()
+  private val masterProperties = new Properties()
   val PROCESS_BOOT_TIME = Duration(25, TimeUnit.SECONDS)
 
   def getActorSystem: ActorSystem = system

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
 
b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
index cdc8cb2..6048034 100644
--- 
a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
+++ 
b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
@@ -33,7 +33,7 @@ import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.state.api.PersistentTask
 import 
org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, 
PersistentStateConfig}
-import org.apache.gearpump.streaming.task.{ReportCheckpointClock, StartTime}
+import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 
 class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
@@ -60,7 +60,7 @@ class CountProcessorSpec extends PropSpec with PropertyChecks 
with Matchers {
         when(taskContext.appMaster).thenReturn(appMaster.ref)
 
         count.onStart(StartTime(0L))
-        appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L))
+        appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L))
 
         for (i <- 0L to num) {
           count.onNext(Message("", i))
@@ -75,7 +75,7 @@ class CountProcessorSpec extends PropSpec with PropertyChecks 
with Matchers {
         when(taskContext.upstreamMinClock).thenReturn(num)
         count.onNext(PersistentTask.CHECKPOINT)
         // Only the state before checkpoint time is checkpointed
-        appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num))
+        appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, num))
     }
 
     system.terminate()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
 
b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
index a488c9f..0963429 100644
--- 
a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
+++ 
b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
@@ -34,7 +34,7 @@ import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.state.api.PersistentTask
 import 
org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, 
PersistentStateConfig, WindowConfig}
-import org.apache.gearpump.streaming.task.{ReportCheckpointClock, StartTime}
+import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 
 class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with 
Matchers {
@@ -62,7 +62,7 @@ class WindowAverageProcessorSpec extends PropSpec with 
PropertyChecks with Match
         when(taskContext.appMaster).thenReturn(appMaster.ref)
 
         windowAverage.onStart(StartTime(0L))
-        appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L))
+        appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L))
 
         for (i <- 0L until num) {
           windowAverage.onNext(Message("" + data, i))
@@ -77,7 +77,7 @@ class WindowAverageProcessorSpec extends PropSpec with 
PropertyChecks with Match
         // Time to checkpoint
         when(taskContext.upstreamMinClock).thenReturn(num)
         windowAverage.onNext(PersistentTask.CHECKPOINT)
-        appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num))
+        appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, num))
     }
 
     system.terminate()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
index a70ce48..f60cc49 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
@@ -25,7 +25,7 @@ import backtype.storm.tuple.Tuple
 import org.apache.gearpump.experiments.storm.topology.TimedTuple
 import org.apache.gearpump.experiments.storm.util.StormConstants._
 import org.apache.gearpump.experiments.storm.util.StormOutputCollector
-import org.apache.gearpump.streaming.task.ReportCheckpointClock
+import org.apache.gearpump.streaming.task.UpdateCheckpointClock
 
 /**
  * this is used by Storm bolt to emit messages
@@ -60,7 +60,7 @@ private[storm] class StormBoltOutputCollector(collector: 
StormOutputCollector,
           val upstreamMinClock = taskContext.upstreamMinClock
           if (reportTime <= upstreamMinClock && upstreamMinClock <= 
maxAckTime) {
             reportTime = upstreamMinClock / CHECKPOINT_INTERVAL_MILLIS * 
CHECKPOINT_INTERVAL_MILLIS
-            taskContext.appMaster ! ReportCheckpointClock(taskContext.taskId, 
reportTime)
+            taskContext.appMaster ! UpdateCheckpointClock(taskContext.taskId, 
reportTime)
             reportTime += CHECKPOINT_INTERVAL_MILLIS
           }
         case _ =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
index 445f26c..320e46f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -33,4 +33,7 @@ object Constants {
 
   val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT =
     "gearpump.streaming.ack-once-every-message-count"
+
+  val GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW =
+    "gearpump.streaming.executor-restart-time-window"
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
index d0cfc80..d1a03de 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -155,21 +155,21 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
       // Checks whether this processor dead, if it is, then we should remove 
it from clockService.
       clockService.foreach(_ forward 
CheckProcessorDeath(unRegister.taskId.processorId))
     case replay: ReplayFromTimestampWindowTrailingEdge =>
-      taskManager.foreach(_ forward replay)
+      if (replay.appId == appId) {
+        taskManager.foreach(_ forward replay)
+      } else {
+        LOG.error(s"replay for invalid appId ${replay.appId}")
+      }
     case messageLoss: MessageLoss =>
       lastFailure = LastFailure(System.currentTimeMillis(), messageLoss.cause)
       taskManager.foreach(_ forward messageLoss)
     case lookupTask: LookupTaskActorRef =>
       taskManager.foreach(_ forward lookupTask)
-    case checkpoint: ReportCheckpointClock =>
-      clockService.foreach(_ forward checkpoint)
     case GetDAG =>
       val task = sender()
       getDAG.foreach {
         dag => task ! dag
       }
-    case GetCheckpointClock =>
-      clockService.foreach(_ forward GetCheckpointClock)
   }
 
   /** Handles messages from Executors */
@@ -182,62 +182,65 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
 
   /** Handles messages from AppMaster */
   def appMasterService: Receive = {
-    case appMasterDataDetailRequest: AppMasterDataDetailRequest =>
-      LOG.debug(s"AppMaster got AppMasterDataDetailRequest for $appId ")
-
-      val executorsFuture = executorBrief
-      val clockFuture = getMinClock
-      val taskFuture = getTaskList
-      val dagFuture = getDAG
-
-      val appMasterDataDetail = for {
-        executors <- executorsFuture
-        clock <- clockFuture
-        tasks <- taskFuture
-        dag <- dagFuture
-      } yield {
-        val graph = dag.graph
-
-        val executorToTasks = tasks.tasks.groupBy(_._2).mapValues {
-          _.keys.toList
-        }
+    case AppMasterDataDetailRequest(rid) =>
+      if (rid == appId) {
+        LOG.info(s"AppMaster got AppMasterDataDetailRequest for $appId ")
+
+        val executorsFuture = executorBrief
+        val clockFuture = getMinClock
+        val taskFuture = getTaskList
+        val dagFuture = getDAG
+
+        val appMasterDataDetail = for {
+          executors <- executorsFuture
+          clock <- clockFuture
+          tasks <- taskFuture
+          dag <- dagFuture
+        } yield {
+          val graph = dag.graph
+
+          val executorToTasks = tasks.tasks.groupBy(_._2).mapValues {
+            _.keys.toList
+          }
 
-        val processors = dag.processors.map { kv =>
-          val processor = kv._2
-          import processor._
-          val tasks = executorToTasks.map { kv =>
-            (kv._1, TaskCount(kv._2.count(_.processorId == id)))
-          }.filter(_._2.count != 0)
-          (id,
-            ProcessorSummary(id, taskClass, parallelism, description, 
taskConf, life,
+          val processors = dag.processors.map { kv =>
+            val processor = kv._2
+            import processor._
+            val tasks = executorToTasks.map { kv =>
+              (kv._1, TaskCount(kv._2.count(_.processorId == id)))
+            }.filter(_._2.count != 0)
+            (id, ProcessorSummary(id, taskClass, parallelism, description, 
taskConf, life,
               tasks.keys.toList, tasks))
-        }
+          }
 
-        StreamAppMasterSummary(
-          appId = appId,
-          appName = app.name,
-          actorPath = address,
-          clock = clock,
-          status = MasterToAppMaster.AppMasterActive,
-          startTime = startTime,
-          uptime = System.currentTimeMillis() - startTime,
-          user = username,
-          homeDirectory = userDir,
-          logFile = logFile.getAbsolutePath,
-          processors = processors,
-          processorLevels = graph.vertexHierarchyLevelMap(),
-          dag = graph.mapEdge { (node1, edge, node2) =>
-            edge.partitionerFactory.name
-          },
-          executors = executors,
-          historyMetricsConfig = getHistoryMetricsConfig
-        )
-      }
+          StreamAppMasterSummary(
+            appId = appId,
+            appName = app.name,
+            actorPath = address,
+            clock = clock,
+            status = MasterToAppMaster.AppMasterActive,
+            startTime = startTime,
+            uptime = System.currentTimeMillis() - startTime,
+            user = username,
+            homeDirectory = userDir,
+            logFile = logFile.getAbsolutePath,
+            processors = processors,
+            processorLevels = graph.vertexHierarchyLevelMap(),
+            dag = graph.mapEdge { (node1, edge, node2) =>
+              edge.partitionerFactory.name
+            },
+            executors = executors,
+            historyMetricsConfig = getHistoryMetricsConfig
+          )
+        }
 
-      val client = sender()
+        val client = sender()
 
-      appMasterDataDetail.map { appData =>
-        client ! appData
+        appMasterDataDetail.map { appData =>
+          client ! appData
+        }
+      } else {
+        LOG.error(s"AppMasterDataDetailRequest for invalid appId $rid")
       }
     // TODO: WebSocket is buggy and disabled.
     //    case appMasterMetricsRequest: AppMasterMetricsRequest =>
@@ -254,8 +257,12 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
       clockService.foreach(_ forward getStalling)
     case replaceDAG: ReplaceProcessor =>
       dagManager forward replaceDAG
-    case GetLastFailure(_) =>
-      sender ! lastFailure
+    case GetLastFailure(id) =>
+      if (id == appId) {
+        sender ! lastFailure
+      } else {
+        LOG.error(s"GetLastFailure for invalid appId $id")
+      }
     case get@GetExecutorSummary(executorId) =>
       val client = sender()
       if (executorId == APPMASTER_DEFAULT_EXECUTOR_ID) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
index bff0f67..6fc0782 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -70,8 +70,8 @@ class ClockService(private var dag: DAG, store: AppDataStore) 
extends Actor with
   }
 
   override def postStop(): Unit = {
-    Option(healthCheckScheduler).map(_.cancel)
-    Option(snapshotScheduler).map(_.cancel)
+    Option(healthCheckScheduler).map(_.cancel())
+    Option(snapshotScheduler).map(_.cancel())
   }
 
   // Keep track of clock value of all processors.
@@ -89,7 +89,7 @@ class ClockService(private var dag: DAG, store: AppDataStore) 
extends Actor with
 
   private def checkpointEnabled(processor: ProcessorDescription): Boolean = {
     val taskConf = processor.taskConf
-    taskConf != null && taskConf.getBoolean("state.checkpoint.enable") == 
Some(true)
+    taskConf != null && 
taskConf.getBoolean("state.checkpoint.enable").contains(true)
   }
 
   private def resetCheckpointClocks(dag: DAG, startClock: TimeStamp): Unit = {
@@ -119,10 +119,10 @@ class ClockService(private var dag: DAG, store: 
AppDataStore) extends Actor with
       }
 
     this.upstreamClocks = clocks.map { pair =>
-      val (processorId, processor) = pair
+      val (processorId, _) = pair
 
       val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1)
-      val upstreamClocks = upstreams.flatMap(clocks.get(_))
+      val upstreamClocks = upstreams.flatMap(clocks.get)
       (processorId, upstreamClocks.toArray)
     }
 
@@ -148,15 +148,15 @@ class ClockService(private var dag: DAG, store: 
AppDataStore) extends Actor with
     this.clocks = newClocks
 
     this.upstreamClocks = newClocks.map { pair =>
-      val (processorId, processor) = pair
+      val (processorId, _) = pair
 
       val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1)
-      val upstreamClocks = upstreams.flatMap(newClocks.get(_))
+      val upstreamClocks = upstreams.flatMap(newClocks.get)
       (processorId, upstreamClocks.toArray)
     }
 
     // Inits the clock of all processors.
-    newClocks.map { pair =>
+    newClocks.foreach { pair =>
       val (processorId, processorClock) = pair
       val upstreamClock = getUpStreamMinClock(processorId)
       val birth = processorClock.life.birth
@@ -249,7 +249,7 @@ class ClockService(private var dag: DAG, store: 
AppDataStore) extends Actor with
     case SnapshotStartClock =>
       snapshotStartClock()
 
-    case ReportCheckpointClock(task, time) =>
+    case UpdateCheckpointClock(task, time) =>
       updateCheckpointClocks(task, time)
 
     case GetCheckpointClock =>
@@ -258,10 +258,10 @@ class ClockService(private var dag: DAG, store: 
AppDataStore) extends Actor with
     case getStalling: GetStallingTasks =>
       sender ! StallingTasks(healthChecker.getReport.stallingTasks)
 
-    case ChangeToNewDAG(dag) =>
-      if (dag.version > this.dag.version) {
+    case ChangeToNewDAG(newDag) =>
+      if (newDag.version > this.dag.version) {
         // Transits to a new dag version
-        this.dag = dag
+        this.dag = newDag
         dynamicDAG(dag, getStartClock)
       } else {
         // Restarts current dag.
@@ -288,7 +288,7 @@ class ClockService(private var dag: DAG, store: 
AppDataStore) extends Actor with
 
     // Removes dead processor from checkpoints.
     checkpointClocks = checkpointClocks.filter { kv =>
-      val (taskId, processor) = kv
+      val (taskId, _) = kv
       taskId.processorId != processorId
     }
   }
@@ -300,13 +300,6 @@ class ClockService(private var dag: DAG, store: 
AppDataStore) extends Actor with
   def selfCheck(): Unit = {
     val minTimestamp = minClock
 
-    if (Long.MaxValue == minTimestamp) {
-      processorClocks.foreach { clock =>
-        LOG.info(s"Processor ${clock.processorId} Clock: min: ${clock.min}, " +
-          s"taskClocks: " + clock.taskClocks.mkString(","))
-      }
-    }
-
     healthChecker.check(minTimestamp, clocks, dag, System.currentTimeMillis())
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
index b3ee046..48cc50e 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
@@ -76,8 +76,15 @@ private[appmaster] class TaskManager(
 
   private val ids = new SessionIdFactory()
 
+  import 
org.apache.gearpump.streaming.Constants.GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW
+  // the default 20 seconds is too small for tests
+  // so that executor will be restarted infinitely
   private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries 
= 5,
-    withinTimeRange = 20.seconds)
+    withinTimeRange = if 
(systemConfig.hasPath(GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW)) {
+      
systemConfig.getInt(GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW).seconds
+    } else {
+      20.seconds
+    })
 
   private implicit val timeout = Constants.FUTURE_TIMEOUT
   private implicit val actorSystem = context.system
@@ -102,7 +109,7 @@ private[appmaster] class TaskManager(
       sender ! TaskList(taskRegistry.getTaskExecutorMap)
     case LookupTaskActorRef(taskId) =>
       val executorId = taskRegistry.getExecutorId(taskId)
-      val requestor = sender
+      val requestor = sender()
       executorId.map { executorId =>
         val taskPath = ActorPathUtil.taskActorPath(appMaster, executorId, 
taskId)
         context.actorSelection(taskPath).resolveOne(3.seconds).map { 
taskActorRef =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
index 0eeb0eb..56bf61d 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
@@ -70,10 +70,11 @@ class Executor(executorContext: ExecutorContext, userConf : 
UserConfig, launcher
   private val taskDispatcher = 
systemConfig.getString(Constants.GEARPUMP_TASK_DISPATCHER)
 
   private var state = State.ACTIVE
-  private var transitionStart = 0L
+
   // States transition start, in unix time
-  private var transitionEnd = 0L
+  private var transitionStart = 0L
   // States transition end, in unix time
+  private var transitionEnd = 0L
   private val transitWarningThreshold = 5000 // ms,
 
   // Starts health check Ticks
@@ -225,7 +226,7 @@ class Executor(executorContext: ExecutorContext, userConf : 
UserConfig, launcher
           registered :+ confirm.taskId))
 
       case rejected: TaskRejected =>
-        // Means this task shoud not exists...
+        // Means this task should not exist...
         tasks.get(rejected.taskId).foreach(_ ! PoisonPill)
         tasks -= rejected.taskId
         LOG.error(s"Task ${rejected.taskId} is rejected by AppMaster, shutting 
down it...")
@@ -344,6 +345,7 @@ class Executor(executorContext: ExecutorContext, userConf : 
UserConfig, launcher
 
   def onRestartTasks: Receive = {
     case RestartTasks(dagVersion) =>
+      transitionStart = System.currentTimeMillis()
       LOG.info(s"Executor received restart tasks")
       val tasksToRestart = tasks.keys.count(taskArgumentStore.get(dagVersion, 
_).nonEmpty)
       express.remoteAddressMap.send(Map.empty[Long, HostPort])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala
index ef96ab9..90615ad 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala
@@ -36,29 +36,18 @@ import org.apache.gearpump.util.RestartPolicy
  */
 class ExecutorRestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) {
   private var executorToTaskIds = Map.empty[Int, Set[TaskId]]
-  private var taskRestartPolocies = new immutable.HashMap[TaskId, 
RestartPolicy]
+  private var taskRestartPolicies = new immutable.HashMap[TaskId, 
RestartPolicy]
 
   def addTaskToExecutor(executorId: Int, taskId: TaskId): Unit = {
     var taskSetForExecutorId = executorToTaskIds.getOrElse(executorId, 
Set.empty[TaskId])
     taskSetForExecutorId += taskId
     executorToTaskIds += executorId -> taskSetForExecutorId
-    if (!taskRestartPolocies.contains(taskId)) {
-      taskRestartPolocies += taskId -> new RestartPolicy(maxNrOfRetries, 
withinTimeRange)
+    if (!taskRestartPolicies.contains(taskId)) {
+      taskRestartPolicies += taskId -> new RestartPolicy(maxNrOfRetries, 
withinTimeRange)
     }
   }
 
   def allowRestartExecutor(executorId: Int): Boolean = {
-    executorToTaskIds.get(executorId).map { taskIds =>
-      taskIds.foreach { taskId =>
-        taskRestartPolocies.get(taskId).map { policy =>
-          if (!policy.allowRestart) {
-            // scalastyle:off return
-            return false
-            // scalastyle:on return
-          }
-        }
-      }
-    }
-    true
+    executorToTaskIds(executorId).forall(taskId => 
taskRestartPolicies(taskId).allowRestart)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
index f04dc9b..c7b503e 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -23,7 +23,7 @@ import scala.concurrent.duration.FiniteDuration
 
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.state.impl.{CheckpointManager, 
PersistentStateConfig}
-import org.apache.gearpump.streaming.task.{ReportCheckpointClock, StartTime, 
Task, TaskContext}
+import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime, 
Task, TaskContext}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 import org.apache.gearpump.util.LogUtil
 import org.apache.gearpump.{Message, TimeStamp}
@@ -110,6 +110,6 @@ abstract class PersistentTask[T](taskContext: TaskContext, 
conf: UserConfig)
   }
 
   private def reportCheckpointClock(timestamp: TimeStamp): Unit = {
-    appMaster ! ReportCheckpointClock(taskContext.taskId, timestamp)
+    appMaster ! UpdateCheckpointClock(taskContext.taskId, timestamp)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
index e537f99..a915e7f 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
@@ -48,15 +48,15 @@ object GetLatestMinClock extends ClockEvent
 
 case class GetUpstreamMinClock(taskId: TaskId) extends ClockEvent
 
-case class UpstreamMinClock(latestMinClock: TimeStamp)
+case class UpdateCheckpointClock(taskId: TaskId, clock: TimeStamp) extends 
ClockEvent
 
-case class LatestMinClock(clock: TimeStamp)
+case object GetCheckpointClock extends ClockEvent
 
-case class ReportCheckpointClock(taskId: TaskId, clock: TimeStamp)
+case class CheckpointClock(clock: Option[TimeStamp])
 
-case object GetCheckpointClock
+case class UpstreamMinClock(latestMinClock: TimeStamp)
 
-case class CheckpointClock(clock: Option[TimeStamp])
+case class LatestMinClock(clock: TimeStamp)
 
 case object GetStartClock
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
index 38a5cf1..c9f1b89 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -17,13 +17,16 @@
  */
 package org.apache.gearpump.streaming.appmaster
 
-import akka.actor.{ActorRef, Props}
+
+import akka.actor.{ActorSystem, ActorRef, Props}
 import akka.testkit.{TestActorRef, TestProbe}
+import com.typesafe.config.ConfigFactory
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.AppMasterToMaster._
 import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import org.apache.gearpump.cluster.ClientToMaster.ShutdownApplication
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, 
ResourceAllocated}
+import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, 
ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster._
+import org.apache.gearpump.cluster.MasterToClient.LastFailure
 import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
 import org.apache.gearpump.cluster._
 import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
AppMasterRuntimeInfo}
@@ -32,20 +35,28 @@ import org.apache.gearpump.cluster.scheduler.{Resource, 
ResourceAllocation, Reso
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.jarstore.FilePath
 import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.AppMasterToExecutor.StopTask
+import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, 
UnRegisterTask}
+import org.apache.gearpump.streaming.appmaster.AppMaster.{TaskActorRef, 
LookupTaskActorRef}
 import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _}
-import org.apache.gearpump.streaming.{Processor, StreamApplication}
-import org.apache.gearpump.util.Graph
+import org.apache.gearpump.streaming.{Constants, DAG, Processor, 
StreamApplication}
+import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem
+import org.apache.gearpump.util.{ActorUtil, Graph}
 import org.apache.gearpump.util.Graph._
 import org.scalatest._
 
 import scala.concurrent.duration._
 
 class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach 
with MasterHarness {
-  protected override def config = TestUtil.DEFAULT_CONFIG
+  protected override def config = {
+    
ConfigFactory.parseString(s"${Constants.GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW}
 = 60")
+      .withFallback(TestUtil.DEFAULT_CONFIG)
+  }
 
   var appMaster: ActorRef = null
 
   val appId = 0
+  val invalidAppId = -1
   val workerId = WorkerId(1, 0L)
   val resource = Resource(1)
   val taskDescription1 = Processor[TaskA](2)
@@ -89,7 +100,7 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
         appMasterContext))(getActorSystem)
 
     val registerAppMaster = mockMaster.receiveOne(15.seconds)
-    assert(registerAppMaster.isInstanceOf[RegisterAppMaster])
+    registerAppMaster shouldBe a [RegisterAppMaster]
     appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster
 
     mockMaster.reply(AppMasterRegistered(appId))
@@ -108,7 +119,9 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
   }
 
   "AppMaster" should {
-    "kill it self when allocate resource time out" in {
+    "kill itself when allocate resource time out" in {
+      // not enough resource allocated
+      // triggers ResourceAllocationTimeout in ExecutorSystemScheduler
       mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2),
         mockWorker.ref, workerId))))
       mockMaster.expectMsg(60.seconds, ShutdownApplication(appId))
@@ -139,74 +152,144 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
       mockMaster.expectMsgClass(15.seconds, classOf[RegisterAppMaster])
     }
 
-    //    // TODO: This test is failing on Travis randomly
-    //    // We have not identifed the root cause.
-    //    // Check: https://travis-ci.org/intel-hadoop/gearpump/builds/56826843
-    //    // Issue tracker: https://github.com/intel-hadoop/gearpump/issues/733
-    //
-    //    "launch executor and task properly" in {
-    //      
mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), 
mockWorker.ref,
-    //       workerId))))
-    //      mockWorker.expectMsgClass(classOf[LaunchExecutor])
-    //
-    //      val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG)
-    //      
mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString))
-    //      for (i <- 1 to 4) {
-    //        mockMaster.expectMsg(10 seconds, AppMasterSpec.TaskStarted)
-    //      }
-    //
-    //      // clock status: task(0,0) -> 1, task(0,1)->0, task(1, 0)->0, 
task(1,1)->0
-    //      appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref)
-    //
-    //      // there is no further upstream, so the upstreamMinClock is 
Long.MaxValue
-    //      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
-    //
-    //      // check min clock
-    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
-    //      mockTask.expectMsg(LatestMinClock(0))
-    //
-    //
-    //      // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, 
task(1,1)->0
-    //      appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref)
-    //
-    //      // there is no further upstream, so the upstreamMinClock is 
Long.MaxValue
-    //      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
-    //
-    //      // check min clock
-    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
-    //      mockTask.expectMsg(LatestMinClock(0))
-    //
-    //      // Clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, 
task(1,1)->0
-    //      appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref)
-    //
-    //      // Min clock of processor 0 (Task(0, 0) and Task(0, 1))
-    //      mockTask.expectMsg(UpstreamMinClock(1))
-    //
-    //      // check min clock
-    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
-    //      mockTask.expectMsg(LatestMinClock(0))
-    //
-    //      // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, 
task(1,1)->1
-    //      appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref)
-    //
-    //      // min clock of processor 0 (Task(0, 0) and Task(0, 1))
-    //      mockTask.expectMsg(UpstreamMinClock(1))
-    //
-    //      // check min clock
-    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
-    //      mockTask.expectMsg(LatestMinClock(1))
-    //
-    //      // shutdown worker and all executor on this work, expect appmaster 
to ask
-    //      // for new resources
-    //      workerSystem.shutdown()
-    //      mockMaster.expectMsg(RequestResource(appId, 
ResourceRequest(Resource(4), relaxation =
-    //        Relaxation.ONEWORKER)))
-    //    }
+    "launch executor and task properly" in {
+      val workerSystem = startApp()
+      expectAppStarted()
+
+      // clock status: task(0,0) -> 1, task(0,1)->0, task(1,0)->0, task(1,1)->0
+      appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref)
+
+      // there is no further upstream, so the upstreamMinClock is Long.MaxValue
+      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
+
+      // check min clock
+      appMaster.tell(GetLatestMinClock, mockTask.ref)
+      mockTask.expectMsg(LatestMinClock(0))
+
+      // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, 
task(1,1)->0
+      appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref)
+
+      // there is no further upstream, so the upstreamMinClock is Long.MaxValue
+      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
+
+      // check min clock
+      appMaster.tell(GetLatestMinClock, mockTask.ref)
+      mockTask.expectMsg(LatestMinClock(0))
+
+      // Clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, 
task(1,1)->0
+      appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref)
+
+      // Min clock of processor 0 (Task(0, 0) and Task(0, 1))
+      mockTask.expectMsg(UpstreamMinClock(1))
+
+      // check min clock
+      appMaster.tell(GetLatestMinClock, mockTask.ref)
+      mockTask.expectMsg(LatestMinClock(0))
+
+      // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, 
task(1,1)->1
+      appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref)
+
+      // min clock of processor 0 (Task(0, 0) and Task(0, 1))
+      mockTask.expectMsg(UpstreamMinClock(1))
+
+      // check min clock
+      appMaster.tell(GetLatestMinClock, mockTask.ref)
+      mockTask.expectMsg(LatestMinClock(1))
+
+      // unregister task
+      for (i <- 0 to 1) {
+        appMaster.tell(UnRegisterTask(TaskId(i, 1), 0), mockTask.ref)
+        mockTask.expectMsg(StopTask(TaskId(i, 1)))
+      }
+
+      workerSystem.terminate()
+    }
+
+    "serve AppMaster data request" in {
+      val workerSystem = startApp()
+      expectAppStarted()
+
+      // get DAG
+      appMaster.tell(GetDAG, mockTask.ref)
+      mockTask.expectMsgType[DAG]
+
+      // get appmaster data
+      appMaster.tell(AppMasterDataDetailRequest(appId), mockTask.ref)
+      mockTask.expectMsgType[StreamAppMasterSummary](30.seconds)
+      appMaster.tell(AppMasterDataDetailRequest(invalidAppId), mockTask.ref)
+      mockTask.expectNoMsg()
+
+      for {
+        i <- 0 to 1
+        j <- 0 to 1
+      } {
+        // lookup task ActorRef
+        appMaster.tell(LookupTaskActorRef(TaskId(i, j)), mockTask.ref)
+        mockTask.expectMsgType[TaskActorRef]
+      }
+
+      workerSystem.terminate()
+    }
+
+    "replay on message loss" in {
+      val workerSystem = startApp()
+      expectAppStarted()
+
+      for (i <- 1 to 5) {
+        val taskId = TaskId(0, 0)
+        appMaster.tell(UpdateClock(taskId, i), mockTask.ref)
+        mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
+        val cause = s"message loss $i from $taskId"
+        appMaster.tell(MessageLoss(0, taskId, cause), mockTask.ref)
+        // appmaster restarted
+        expectAppStarted()
+
+        appMaster.tell(GetLastFailure(appId), mockTask.ref)
+        val failure = mockTask.expectMsgType[LastFailure]
+        failure.error shouldBe cause
+
+        appMaster.tell(GetLastFailure(invalidAppId), mockTask.ref)
+        mockTask.expectNoMsg()
+      }
+
+      // fail to recover after restarting a tasks for 5 times
+      appMaster.tell(MessageLoss(0, TaskId(0, 0), "message loss"), 
mockTask.ref)
+      mockMaster.expectMsg(60.seconds, ShutdownApplication(appId))
+
+      workerSystem.terminate()
+    }
+
+    "replay on client request" in {
+      startApp()
+      expectAppStarted()
+
+      appMaster.tell(ReplayFromTimestampWindowTrailingEdge(appId), 
mockTask.ref)
+      expectAppStarted()
+
+      appMaster.tell(ReplayFromTimestampWindowTrailingEdge(invalidAppId), 
mockTask.ref)
+      mockMaster.expectNoMsg()
+    }
   }
 
   def ignoreSaveAppData: PartialFunction[Any, Boolean] = {
     case msg: SaveAppData => true
   }
+
+  def startApp(): ActorSystem = {
+    mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), 
mockWorker.ref,
+      workerId))))
+    mockWorker.expectMsgClass(classOf[LaunchExecutor])
+
+    val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG)
+    
mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString))
+    workerSystem
+  }
+
+  def expectAppStarted(): Unit = {
+    // wait for app to get started
+    mockMaster.expectMsg(ActivateAppMaster(appId))
+    mockMaster.reply(AppMasterActivated(appId))
+  }
 }
 
 object AppMasterSpec {
@@ -218,9 +301,7 @@ object AppMasterSpec {
 
 class TaskA(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
 
-  val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get
   override def onStart(startTime: StartTime): Unit = {
-    master ! AppMasterSpec.TaskStarted
   }
 
   override def onNext(msg: Message): Unit = {}
@@ -228,9 +309,7 @@ class TaskA(taskContext: TaskContext, userConf: UserConfig) 
extends Task(taskCon
 
 class TaskB(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
 
-  val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get
   override def onStart(startTime: StartTime): Unit = {
-    master ! AppMasterSpec.TaskStarted
   }
 
   override def onNext(msg: Message): Unit = {}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
index 0729877..e742a2c 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
@@ -97,7 +97,7 @@ class ClockServiceSpec(_system: ActorSystem) extends 
TestKit(_system) with Impli
         task3 ~ hash ~> task2,
         task2 ~ hash ~> task4,
         task5 ~ hash ~> task1
-      ))
+      ), version = 1)
       val user = TestProbe()
       clockService.tell(ChangeToNewDAG(dagAddMiddleNode), user.ref)
 
@@ -147,17 +147,17 @@ class ClockServiceSpec(_system: ActorSystem) extends 
TestKit(_system) with Impli
       clockService ! ChangeToNewDAG(dagWithStateTasks)
       expectMsgType[ChangeToNewDAGSuccess]
 
-      clockService ! ReportCheckpointClock(taskId3, startClock)
-      clockService ! ReportCheckpointClock(taskId4, startClock)
+      clockService ! UpdateCheckpointClock(taskId3, startClock)
+      clockService ! UpdateCheckpointClock(taskId4, startClock)
       clockService ! GetStartClock
       expectMsg(StartClock(startClock))
 
-      clockService ! ReportCheckpointClock(taskId3, 200L)
-      clockService ! ReportCheckpointClock(taskId4, 300L)
+      clockService ! UpdateCheckpointClock(taskId3, 200L)
+      clockService ! UpdateCheckpointClock(taskId4, 300L)
       clockService ! GetStartClock
       expectMsg(StartClock(startClock))
 
-      clockService ! ReportCheckpointClock(taskId3, 300L)
+      clockService ! UpdateCheckpointClock(taskId3, 300L)
       clockService ! GetStartClock
       expectMsg(StartClock(300L))
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index dac5a5c..0dd3e5b 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -228,9 +228,10 @@ class TaskManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
 
     // Step9: start broadcasting TaskLocations.
     import scala.concurrent.duration._
-    assert(executorManager.expectMsgPF(5.seconds) {
-      case BroadCast(taskLocationsReady) => 
taskLocationsReady.isInstanceOf[TaskLocationsReady]
-    })
+    executorManager.expectMsgType[BroadCast](5.seconds) match {
+      case BroadCast(taskLocationsReady) =>
+        taskLocationsReady shouldBe a [TaskLocationsReady]
+    }
 
     // Step10: Executor confirm it has received TaskLocationsReceived(version, 
executorId)
     taskManager.tell(TaskLocationsReceived(dag.version, executorId), 
executor.ref)
@@ -241,9 +242,10 @@ class TaskManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
 
     // Step12: start all tasks
     import scala.concurrent.duration._
-    assert(executorManager.expectMsgPF(5.seconds) {
-      case BroadCast(startAllTasks) => 
startAllTasks.isInstanceOf[StartAllTasks]
-    })
+    executorManager.expectMsgType[BroadCast](5.seconds) match {
+      case BroadCast(startAllTasks) =>
+        startAllTasks shouldBe a [StartAllTasks]
+    }
 
     // Step13, Tell ExecutorManager the updated usage status of executors.
     executorManager.expectMsgType[ExecutorResourceUsageSummary]


Reply via email to