Repository: incubator-gearpump Updated Branches: refs/heads/master 757dd367e -> ae56903da
fix GEARPUMP-61, improve code coverage Author: manuzhang <[email protected]> Closes #43 from manuzhang/GEARPUMP-61. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ae56903d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ae56903d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ae56903d Branch: refs/heads/master Commit: ae56903da7d3ae352eb919ace323e0efabd34128 Parents: 757dd36 Author: manuzhang <[email protected]> Authored: Wed Jun 22 16:23:55 2016 -0700 Committer: Kam Kasravi <[email protected]> Committed: Wed Jun 22 16:23:55 2016 -0700 ---------------------------------------------------------------------- .../org/apache/gearpump/util/ActorUtil.scala | 3 +- .../gearpump/util/HistoryMetricsService.scala | 3 - .../apache/gearpump/cluster/MasterHarness.scala | 2 +- .../state/processor/CountProcessorSpec.scala | 6 +- .../processor/WindowAverageProcessorSpec.scala | 6 +- .../processor/StormBoltOutputCollector.scala | 4 +- .../apache/gearpump/streaming/Constants.scala | 3 + .../streaming/appmaster/AppMaster.scala | 123 +++++----- .../streaming/appmaster/ClockService.scala | 33 ++- .../streaming/appmaster/TaskManager.scala | 11 +- .../gearpump/streaming/executor/Executor.scala | 8 +- .../executor/ExecutorRestartPolicy.scala | 19 +- .../streaming/state/api/PersistentTask.scala | 4 +- .../streaming/task/TaskControlMessage.scala | 10 +- .../streaming/appmaster/AppMasterSpec.scala | 229 +++++++++++++------ .../streaming/appmaster/ClockServiceSpec.scala | 12 +- .../streaming/appmaster/TaskManagerSpec.scala | 14 +- 17 files changed, 284 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala index 749cdf2..09f2969 100644 --- a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala +++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala @@ -54,11 +54,10 @@ object ActorUtil { def defaultMsgHandler(actor: ActorRef): Receive = { case msg: Any => - LOG.error(s"Cannot find a matching message, ${msg.getClass.toString}, forwarded from $actor") + LOG.error(s"Cannot find a matching message, $msg, forwarded from $actor") } def printActorSystemTree(system: ActorSystem): Unit = { - val extendedSystem = system.asInstanceOf[ExtendedActorSystem] val clazz = system.getClass val m = clazz.getDeclaredMethod("printTree") m.setAccessible(true) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala index 549c34f..ee59678 100644 --- a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala +++ b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala @@ -358,9 +358,6 @@ object HistoryMetricsService { class GaugeMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore { - private val compartor = (left: HistoryMetricsItem, right: HistoryMetricsItem) => - left.value.asInstanceOf[Gauge].value > right.value.asInstanceOf[Gauge].value - private val history = new SingleValueMetricsStore( config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs, config.retainHistoryDataIntervalMs) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala index f2f374e..1ec5660 100644 --- a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala @@ -43,7 +43,7 @@ trait MasterHarness { private var systemAddress: Address = null private var host: String = null private var port: Int = 0 - private var masterProperties = new Properties() + private val masterProperties = new Properties() val PROCESS_BOOT_TIME = Duration(25, TimeUnit.SECONDS) def getActorSystem: ActorSystem = system http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala index cdc8cb2..6048034 100644 --- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala +++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala @@ -33,7 +33,7 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.state.api.PersistentTask import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig} -import org.apache.gearpump.streaming.task.{ReportCheckpointClock, StartTime} +import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime} import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers { @@ -60,7 +60,7 @@ class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers { when(taskContext.appMaster).thenReturn(appMaster.ref) count.onStart(StartTime(0L)) - appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L)) + appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L)) for (i <- 0L to num) { count.onNext(Message("", i)) @@ -75,7 +75,7 @@ class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers { when(taskContext.upstreamMinClock).thenReturn(num) count.onNext(PersistentTask.CHECKPOINT) // Only the state before checkpoint time is checkpointed - appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num)) + appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, num)) } system.terminate() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala index a488c9f..0963429 100644 --- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala +++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala @@ -34,7 +34,7 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.state.api.PersistentTask import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig, WindowConfig} -import org.apache.gearpump.streaming.task.{ReportCheckpointClock, StartTime} +import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime} import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Matchers { @@ -62,7 +62,7 @@ class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Match when(taskContext.appMaster).thenReturn(appMaster.ref) windowAverage.onStart(StartTime(0L)) - appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L)) + appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L)) for (i <- 0L until num) { windowAverage.onNext(Message("" + data, i)) @@ -77,7 +77,7 @@ class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Match // Time to checkpoint when(taskContext.upstreamMinClock).thenReturn(num) windowAverage.onNext(PersistentTask.CHECKPOINT) - appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num)) + appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, num)) } system.terminate() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala index a70ce48..f60cc49 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala @@ -25,7 +25,7 @@ import backtype.storm.tuple.Tuple import org.apache.gearpump.experiments.storm.topology.TimedTuple import org.apache.gearpump.experiments.storm.util.StormConstants._ import org.apache.gearpump.experiments.storm.util.StormOutputCollector -import org.apache.gearpump.streaming.task.ReportCheckpointClock +import org.apache.gearpump.streaming.task.UpdateCheckpointClock /** * this is used by Storm bolt to emit messages @@ -60,7 +60,7 @@ private[storm] class StormBoltOutputCollector(collector: StormOutputCollector, val upstreamMinClock = taskContext.upstreamMinClock if (reportTime <= upstreamMinClock && upstreamMinClock <= maxAckTime) { reportTime = upstreamMinClock / CHECKPOINT_INTERVAL_MILLIS * CHECKPOINT_INTERVAL_MILLIS - taskContext.appMaster ! ReportCheckpointClock(taskContext.taskId, reportTime) + taskContext.appMaster ! UpdateCheckpointClock(taskContext.taskId, reportTime) reportTime += CHECKPOINT_INTERVAL_MILLIS } case _ => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala index 445f26c..320e46f 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala @@ -33,4 +33,7 @@ object Constants { val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT = "gearpump.streaming.ack-once-every-message-count" + + val GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW = + "gearpump.streaming.executor-restart-time-window" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala index d0cfc80..d1a03de 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala @@ -155,21 +155,21 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli // Checks whether this processor dead, if it is, then we should remove it from clockService. clockService.foreach(_ forward CheckProcessorDeath(unRegister.taskId.processorId)) case replay: ReplayFromTimestampWindowTrailingEdge => - taskManager.foreach(_ forward replay) + if (replay.appId == appId) { + taskManager.foreach(_ forward replay) + } else { + LOG.error(s"replay for invalid appId ${replay.appId}") + } case messageLoss: MessageLoss => lastFailure = LastFailure(System.currentTimeMillis(), messageLoss.cause) taskManager.foreach(_ forward messageLoss) case lookupTask: LookupTaskActorRef => taskManager.foreach(_ forward lookupTask) - case checkpoint: ReportCheckpointClock => - clockService.foreach(_ forward checkpoint) case GetDAG => val task = sender() getDAG.foreach { dag => task ! dag } - case GetCheckpointClock => - clockService.foreach(_ forward GetCheckpointClock) } /** Handles messages from Executors */ @@ -182,62 +182,65 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli /** Handles messages from AppMaster */ def appMasterService: Receive = { - case appMasterDataDetailRequest: AppMasterDataDetailRequest => - LOG.debug(s"AppMaster got AppMasterDataDetailRequest for $appId ") - - val executorsFuture = executorBrief - val clockFuture = getMinClock - val taskFuture = getTaskList - val dagFuture = getDAG - - val appMasterDataDetail = for { - executors <- executorsFuture - clock <- clockFuture - tasks <- taskFuture - dag <- dagFuture - } yield { - val graph = dag.graph - - val executorToTasks = tasks.tasks.groupBy(_._2).mapValues { - _.keys.toList - } + case AppMasterDataDetailRequest(rid) => + if (rid == appId) { + LOG.info(s"AppMaster got AppMasterDataDetailRequest for $appId ") + + val executorsFuture = executorBrief + val clockFuture = getMinClock + val taskFuture = getTaskList + val dagFuture = getDAG + + val appMasterDataDetail = for { + executors <- executorsFuture + clock <- clockFuture + tasks <- taskFuture + dag <- dagFuture + } yield { + val graph = dag.graph + + val executorToTasks = tasks.tasks.groupBy(_._2).mapValues { + _.keys.toList + } - val processors = dag.processors.map { kv => - val processor = kv._2 - import processor._ - val tasks = executorToTasks.map { kv => - (kv._1, TaskCount(kv._2.count(_.processorId == id))) - }.filter(_._2.count != 0) - (id, - ProcessorSummary(id, taskClass, parallelism, description, taskConf, life, + val processors = dag.processors.map { kv => + val processor = kv._2 + import processor._ + val tasks = executorToTasks.map { kv => + (kv._1, TaskCount(kv._2.count(_.processorId == id))) + }.filter(_._2.count != 0) + (id, ProcessorSummary(id, taskClass, parallelism, description, taskConf, life, tasks.keys.toList, tasks)) - } + } - StreamAppMasterSummary( - appId = appId, - appName = app.name, - actorPath = address, - clock = clock, - status = MasterToAppMaster.AppMasterActive, - startTime = startTime, - uptime = System.currentTimeMillis() - startTime, - user = username, - homeDirectory = userDir, - logFile = logFile.getAbsolutePath, - processors = processors, - processorLevels = graph.vertexHierarchyLevelMap(), - dag = graph.mapEdge { (node1, edge, node2) => - edge.partitionerFactory.name - }, - executors = executors, - historyMetricsConfig = getHistoryMetricsConfig - ) - } + StreamAppMasterSummary( + appId = appId, + appName = app.name, + actorPath = address, + clock = clock, + status = MasterToAppMaster.AppMasterActive, + startTime = startTime, + uptime = System.currentTimeMillis() - startTime, + user = username, + homeDirectory = userDir, + logFile = logFile.getAbsolutePath, + processors = processors, + processorLevels = graph.vertexHierarchyLevelMap(), + dag = graph.mapEdge { (node1, edge, node2) => + edge.partitionerFactory.name + }, + executors = executors, + historyMetricsConfig = getHistoryMetricsConfig + ) + } - val client = sender() + val client = sender() - appMasterDataDetail.map { appData => - client ! appData + appMasterDataDetail.map { appData => + client ! appData + } + } else { + LOG.error(s"AppMasterDataDetailRequest for invalid appId $rid") } // TODO: WebSocket is buggy and disabled. // case appMasterMetricsRequest: AppMasterMetricsRequest => @@ -254,8 +257,12 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli clockService.foreach(_ forward getStalling) case replaceDAG: ReplaceProcessor => dagManager forward replaceDAG - case GetLastFailure(_) => - sender ! lastFailure + case GetLastFailure(id) => + if (id == appId) { + sender ! lastFailure + } else { + LOG.error(s"GetLastFailure for invalid appId $id") + } case get@GetExecutorSummary(executorId) => val client = sender() if (executorId == APPMASTER_DEFAULT_EXECUTOR_ID) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala index bff0f67..6fc0782 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala @@ -70,8 +70,8 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with } override def postStop(): Unit = { - Option(healthCheckScheduler).map(_.cancel) - Option(snapshotScheduler).map(_.cancel) + Option(healthCheckScheduler).map(_.cancel()) + Option(snapshotScheduler).map(_.cancel()) } // Keep track of clock value of all processors. @@ -89,7 +89,7 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with private def checkpointEnabled(processor: ProcessorDescription): Boolean = { val taskConf = processor.taskConf - taskConf != null && taskConf.getBoolean("state.checkpoint.enable") == Some(true) + taskConf != null && taskConf.getBoolean("state.checkpoint.enable").contains(true) } private def resetCheckpointClocks(dag: DAG, startClock: TimeStamp): Unit = { @@ -119,10 +119,10 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with } this.upstreamClocks = clocks.map { pair => - val (processorId, processor) = pair + val (processorId, _) = pair val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1) - val upstreamClocks = upstreams.flatMap(clocks.get(_)) + val upstreamClocks = upstreams.flatMap(clocks.get) (processorId, upstreamClocks.toArray) } @@ -148,15 +148,15 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with this.clocks = newClocks this.upstreamClocks = newClocks.map { pair => - val (processorId, processor) = pair + val (processorId, _) = pair val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1) - val upstreamClocks = upstreams.flatMap(newClocks.get(_)) + val upstreamClocks = upstreams.flatMap(newClocks.get) (processorId, upstreamClocks.toArray) } // Inits the clock of all processors. - newClocks.map { pair => + newClocks.foreach { pair => val (processorId, processorClock) = pair val upstreamClock = getUpStreamMinClock(processorId) val birth = processorClock.life.birth @@ -249,7 +249,7 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with case SnapshotStartClock => snapshotStartClock() - case ReportCheckpointClock(task, time) => + case UpdateCheckpointClock(task, time) => updateCheckpointClocks(task, time) case GetCheckpointClock => @@ -258,10 +258,10 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with case getStalling: GetStallingTasks => sender ! StallingTasks(healthChecker.getReport.stallingTasks) - case ChangeToNewDAG(dag) => - if (dag.version > this.dag.version) { + case ChangeToNewDAG(newDag) => + if (newDag.version > this.dag.version) { // Transits to a new dag version - this.dag = dag + this.dag = newDag dynamicDAG(dag, getStartClock) } else { // Restarts current dag. @@ -288,7 +288,7 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with // Removes dead processor from checkpoints. checkpointClocks = checkpointClocks.filter { kv => - val (taskId, processor) = kv + val (taskId, _) = kv taskId.processorId != processorId } } @@ -300,13 +300,6 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with def selfCheck(): Unit = { val minTimestamp = minClock - if (Long.MaxValue == minTimestamp) { - processorClocks.foreach { clock => - LOG.info(s"Processor ${clock.processorId} Clock: min: ${clock.min}, " + - s"taskClocks: " + clock.taskClocks.mkString(",")) - } - } - healthChecker.check(minTimestamp, clocks, dag, System.currentTimeMillis()) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala index b3ee046..48cc50e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala @@ -76,8 +76,15 @@ private[appmaster] class TaskManager( private val ids = new SessionIdFactory() + import org.apache.gearpump.streaming.Constants.GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW + // the default 20 seconds is too small for tests + // so that executor will be restarted infinitely private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries = 5, - withinTimeRange = 20.seconds) + withinTimeRange = if (systemConfig.hasPath(GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW)) { + systemConfig.getInt(GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW).seconds + } else { + 20.seconds + }) private implicit val timeout = Constants.FUTURE_TIMEOUT private implicit val actorSystem = context.system @@ -102,7 +109,7 @@ private[appmaster] class TaskManager( sender ! TaskList(taskRegistry.getTaskExecutorMap) case LookupTaskActorRef(taskId) => val executorId = taskRegistry.getExecutorId(taskId) - val requestor = sender + val requestor = sender() executorId.map { executorId => val taskPath = ActorPathUtil.taskActorPath(appMaster, executorId, taskId) context.actorSelection(taskPath).resolveOne(3.seconds).map { taskActorRef => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala index 0eeb0eb..56bf61d 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala @@ -70,10 +70,11 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher private val taskDispatcher = systemConfig.getString(Constants.GEARPUMP_TASK_DISPATCHER) private var state = State.ACTIVE - private var transitionStart = 0L + // States transition start, in unix time - private var transitionEnd = 0L + private var transitionStart = 0L // States transition end, in unix time + private var transitionEnd = 0L private val transitWarningThreshold = 5000 // ms, // Starts health check Ticks @@ -225,7 +226,7 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher registered :+ confirm.taskId)) case rejected: TaskRejected => - // Means this task shoud not exists... + // Means this task should not exist... tasks.get(rejected.taskId).foreach(_ ! PoisonPill) tasks -= rejected.taskId LOG.error(s"Task ${rejected.taskId} is rejected by AppMaster, shutting down it...") @@ -344,6 +345,7 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher def onRestartTasks: Receive = { case RestartTasks(dagVersion) => + transitionStart = System.currentTimeMillis() LOG.info(s"Executor received restart tasks") val tasksToRestart = tasks.keys.count(taskArgumentStore.get(dagVersion, _).nonEmpty) express.remoteAddressMap.send(Map.empty[Long, HostPort]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala index ef96ab9..90615ad 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala @@ -36,29 +36,18 @@ import org.apache.gearpump.util.RestartPolicy */ class ExecutorRestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) { private var executorToTaskIds = Map.empty[Int, Set[TaskId]] - private var taskRestartPolocies = new immutable.HashMap[TaskId, RestartPolicy] + private var taskRestartPolicies = new immutable.HashMap[TaskId, RestartPolicy] def addTaskToExecutor(executorId: Int, taskId: TaskId): Unit = { var taskSetForExecutorId = executorToTaskIds.getOrElse(executorId, Set.empty[TaskId]) taskSetForExecutorId += taskId executorToTaskIds += executorId -> taskSetForExecutorId - if (!taskRestartPolocies.contains(taskId)) { - taskRestartPolocies += taskId -> new RestartPolicy(maxNrOfRetries, withinTimeRange) + if (!taskRestartPolicies.contains(taskId)) { + taskRestartPolicies += taskId -> new RestartPolicy(maxNrOfRetries, withinTimeRange) } } def allowRestartExecutor(executorId: Int): Boolean = { - executorToTaskIds.get(executorId).map { taskIds => - taskIds.foreach { taskId => - taskRestartPolocies.get(taskId).map { policy => - if (!policy.allowRestart) { - // scalastyle:off return - return false - // scalastyle:on return - } - } - } - } - true + executorToTaskIds(executorId).forall(taskId => taskRestartPolicies(taskId).allowRestart) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala index f04dc9b..c7b503e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala @@ -23,7 +23,7 @@ import scala.concurrent.duration.FiniteDuration import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} -import org.apache.gearpump.streaming.task.{ReportCheckpointClock, StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime, Task, TaskContext} import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory import org.apache.gearpump.util.LogUtil import org.apache.gearpump.{Message, TimeStamp} @@ -110,6 +110,6 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) } private def reportCheckpointClock(timestamp: TimeStamp): Unit = { - appMaster ! ReportCheckpointClock(taskContext.taskId, timestamp) + appMaster ! UpdateCheckpointClock(taskContext.taskId, timestamp) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala index e537f99..a915e7f 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala @@ -48,15 +48,15 @@ object GetLatestMinClock extends ClockEvent case class GetUpstreamMinClock(taskId: TaskId) extends ClockEvent -case class UpstreamMinClock(latestMinClock: TimeStamp) +case class UpdateCheckpointClock(taskId: TaskId, clock: TimeStamp) extends ClockEvent -case class LatestMinClock(clock: TimeStamp) +case object GetCheckpointClock extends ClockEvent -case class ReportCheckpointClock(taskId: TaskId, clock: TimeStamp) +case class CheckpointClock(clock: Option[TimeStamp]) -case object GetCheckpointClock +case class UpstreamMinClock(latestMinClock: TimeStamp) -case class CheckpointClock(clock: Option[TimeStamp]) +case class LatestMinClock(clock: TimeStamp) case object GetStartClock http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala index 38a5cf1..c9f1b89 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala @@ -17,13 +17,16 @@ */ package org.apache.gearpump.streaming.appmaster -import akka.actor.{ActorRef, Props} + +import akka.actor.{ActorSystem, ActorRef, Props} import akka.testkit.{TestActorRef, TestProbe} +import com.typesafe.config.ConfigFactory import org.apache.gearpump.Message import org.apache.gearpump.cluster.AppMasterToMaster._ import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor -import org.apache.gearpump.cluster.ClientToMaster.ShutdownApplication -import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated} +import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, ShutdownApplication} +import org.apache.gearpump.cluster.MasterToAppMaster._ +import org.apache.gearpump.cluster.MasterToClient.LastFailure import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected import org.apache.gearpump.cluster._ import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo} @@ -32,20 +35,28 @@ import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, Reso import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.jarstore.FilePath import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.AppMasterToExecutor.StopTask +import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, UnRegisterTask} +import org.apache.gearpump.streaming.appmaster.AppMaster.{TaskActorRef, LookupTaskActorRef} import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _} -import org.apache.gearpump.streaming.{Processor, StreamApplication} -import org.apache.gearpump.util.Graph +import org.apache.gearpump.streaming.{Constants, DAG, Processor, StreamApplication} +import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem +import org.apache.gearpump.util.{ActorUtil, Graph} import org.apache.gearpump.util.Graph._ import org.scalatest._ import scala.concurrent.duration._ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { - protected override def config = TestUtil.DEFAULT_CONFIG + protected override def config = { + ConfigFactory.parseString(s"${Constants.GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW} = 60") + .withFallback(TestUtil.DEFAULT_CONFIG) + } var appMaster: ActorRef = null val appId = 0 + val invalidAppId = -1 val workerId = WorkerId(1, 0L) val resource = Resource(1) val taskDescription1 = Processor[TaskA](2) @@ -89,7 +100,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with appMasterContext))(getActorSystem) val registerAppMaster = mockMaster.receiveOne(15.seconds) - assert(registerAppMaster.isInstanceOf[RegisterAppMaster]) + registerAppMaster shouldBe a [RegisterAppMaster] appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster mockMaster.reply(AppMasterRegistered(appId)) @@ -108,7 +119,9 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with } "AppMaster" should { - "kill it self when allocate resource time out" in { + "kill itself when allocate resource time out" in { + // not enough resource allocated + // triggers ResourceAllocationTimeout in ExecutorSystemScheduler mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2), mockWorker.ref, workerId)))) mockMaster.expectMsg(60.seconds, ShutdownApplication(appId)) @@ -139,74 +152,144 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with mockMaster.expectMsgClass(15.seconds, classOf[RegisterAppMaster]) } - // // TODO: This test is failing on Travis randomly - // // We have not identifed the root cause. - // // Check: https://travis-ci.org/intel-hadoop/gearpump/builds/56826843 - // // Issue tracker: https://github.com/intel-hadoop/gearpump/issues/733 - // - // "launch executor and task properly" in { - // mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), mockWorker.ref, - // workerId)))) - // mockWorker.expectMsgClass(classOf[LaunchExecutor]) - // - // val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG) - // mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString)) - // for (i <- 1 to 4) { - // mockMaster.expectMsg(10 seconds, AppMasterSpec.TaskStarted) - // } - // - // // clock status: task(0,0) -> 1, task(0,1)->0, task(1, 0)->0, task(1,1)->0 - // appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref) - // - // // there is no further upstream, so the upstreamMinClock is Long.MaxValue - // mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) - // - // // check min clock - // appMaster.tell(GetLatestMinClock, mockTask.ref) - // mockTask.expectMsg(LatestMinClock(0)) - // - // - // // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0 - // appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref) - // - // // there is no further upstream, so the upstreamMinClock is Long.MaxValue - // mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) - // - // // check min clock - // appMaster.tell(GetLatestMinClock, mockTask.ref) - // mockTask.expectMsg(LatestMinClock(0)) - // - // // Clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->0 - // appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref) - // - // // Min clock of processor 0 (Task(0, 0) and Task(0, 1)) - // mockTask.expectMsg(UpstreamMinClock(1)) - // - // // check min clock - // appMaster.tell(GetLatestMinClock, mockTask.ref) - // mockTask.expectMsg(LatestMinClock(0)) - // - // // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->1 - // appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref) - // - // // min clock of processor 0 (Task(0, 0) and Task(0, 1)) - // mockTask.expectMsg(UpstreamMinClock(1)) - // - // // check min clock - // appMaster.tell(GetLatestMinClock, mockTask.ref) - // mockTask.expectMsg(LatestMinClock(1)) - // - // // shutdown worker and all executor on this work, expect appmaster to ask - // // for new resources - // workerSystem.shutdown() - // mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(4), relaxation = - // Relaxation.ONEWORKER))) - // } + "launch executor and task properly" in { + val workerSystem = startApp() + expectAppStarted() + + // clock status: task(0,0) -> 1, task(0,1)->0, task(1,0)->0, task(1,1)->0 + appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref) + + // there is no further upstream, so the upstreamMinClock is Long.MaxValue + mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) + + // check min clock + appMaster.tell(GetLatestMinClock, mockTask.ref) + mockTask.expectMsg(LatestMinClock(0)) + + // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0 + appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref) + + // there is no further upstream, so the upstreamMinClock is Long.MaxValue + mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) + + // check min clock + appMaster.tell(GetLatestMinClock, mockTask.ref) + mockTask.expectMsg(LatestMinClock(0)) + + // Clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->0 + appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref) + + // Min clock of processor 0 (Task(0, 0) and Task(0, 1)) + mockTask.expectMsg(UpstreamMinClock(1)) + + // check min clock + appMaster.tell(GetLatestMinClock, mockTask.ref) + mockTask.expectMsg(LatestMinClock(0)) + + // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->1 + appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref) + + // min clock of processor 0 (Task(0, 0) and Task(0, 1)) + mockTask.expectMsg(UpstreamMinClock(1)) + + // check min clock + appMaster.tell(GetLatestMinClock, mockTask.ref) + mockTask.expectMsg(LatestMinClock(1)) + + // unregister task + for (i <- 0 to 1) { + appMaster.tell(UnRegisterTask(TaskId(i, 1), 0), mockTask.ref) + mockTask.expectMsg(StopTask(TaskId(i, 1))) + } + + workerSystem.terminate() + } + + "serve AppMaster data request" in { + val workerSystem = startApp() + expectAppStarted() + + // get DAG + appMaster.tell(GetDAG, mockTask.ref) + mockTask.expectMsgType[DAG] + + // get appmaster data + appMaster.tell(AppMasterDataDetailRequest(appId), mockTask.ref) + mockTask.expectMsgType[StreamAppMasterSummary](30.seconds) + appMaster.tell(AppMasterDataDetailRequest(invalidAppId), mockTask.ref) + mockTask.expectNoMsg() + + for { + i <- 0 to 1 + j <- 0 to 1 + } { + // lookup task ActorRef + appMaster.tell(LookupTaskActorRef(TaskId(i, j)), mockTask.ref) + mockTask.expectMsgType[TaskActorRef] + } + + workerSystem.terminate() + } + + "replay on message loss" in { + val workerSystem = startApp() + expectAppStarted() + + for (i <- 1 to 5) { + val taskId = TaskId(0, 0) + appMaster.tell(UpdateClock(taskId, i), mockTask.ref) + mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) + val cause = s"message loss $i from $taskId" + appMaster.tell(MessageLoss(0, taskId, cause), mockTask.ref) + // appmaster restarted + expectAppStarted() + + appMaster.tell(GetLastFailure(appId), mockTask.ref) + val failure = mockTask.expectMsgType[LastFailure] + failure.error shouldBe cause + + appMaster.tell(GetLastFailure(invalidAppId), mockTask.ref) + mockTask.expectNoMsg() + } + + // fail to recover after restarting a tasks for 5 times + appMaster.tell(MessageLoss(0, TaskId(0, 0), "message loss"), mockTask.ref) + mockMaster.expectMsg(60.seconds, ShutdownApplication(appId)) + + workerSystem.terminate() + } + + "replay on client request" in { + startApp() + expectAppStarted() + + appMaster.tell(ReplayFromTimestampWindowTrailingEdge(appId), mockTask.ref) + expectAppStarted() + + appMaster.tell(ReplayFromTimestampWindowTrailingEdge(invalidAppId), mockTask.ref) + mockMaster.expectNoMsg() + } } def ignoreSaveAppData: PartialFunction[Any, Boolean] = { case msg: SaveAppData => true } + + def startApp(): ActorSystem = { + mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), mockWorker.ref, + workerId)))) + mockWorker.expectMsgClass(classOf[LaunchExecutor]) + + val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG) + mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString)) + workerSystem + } + + def expectAppStarted(): Unit = { + // wait for app to get started + mockMaster.expectMsg(ActivateAppMaster(appId)) + mockMaster.reply(AppMasterActivated(appId)) + } } object AppMasterSpec { @@ -218,9 +301,7 @@ object AppMasterSpec { class TaskA(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get override def onStart(startTime: StartTime): Unit = { - master ! AppMasterSpec.TaskStarted } override def onNext(msg: Message): Unit = {} @@ -228,9 +309,7 @@ class TaskA(taskContext: TaskContext, userConf: UserConfig) extends Task(taskCon class TaskB(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get override def onStart(startTime: StartTime): Unit = { - master ! AppMasterSpec.TaskStarted } override def onNext(msg: Message): Unit = {} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala index 0729877..e742a2c 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala @@ -97,7 +97,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli task3 ~ hash ~> task2, task2 ~ hash ~> task4, task5 ~ hash ~> task1 - )) + ), version = 1) val user = TestProbe() clockService.tell(ChangeToNewDAG(dagAddMiddleNode), user.ref) @@ -147,17 +147,17 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli clockService ! ChangeToNewDAG(dagWithStateTasks) expectMsgType[ChangeToNewDAGSuccess] - clockService ! ReportCheckpointClock(taskId3, startClock) - clockService ! ReportCheckpointClock(taskId4, startClock) + clockService ! UpdateCheckpointClock(taskId3, startClock) + clockService ! UpdateCheckpointClock(taskId4, startClock) clockService ! GetStartClock expectMsg(StartClock(startClock)) - clockService ! ReportCheckpointClock(taskId3, 200L) - clockService ! ReportCheckpointClock(taskId4, 300L) + clockService ! UpdateCheckpointClock(taskId3, 200L) + clockService ! UpdateCheckpointClock(taskId4, 300L) clockService ! GetStartClock expectMsg(StartClock(startClock)) - clockService ! ReportCheckpointClock(taskId3, 300L) + clockService ! UpdateCheckpointClock(taskId3, 300L) clockService ! GetStartClock expectMsg(StartClock(300L)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae56903d/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala index dac5a5c..0dd3e5b 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -228,9 +228,10 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { // Step9: start broadcasting TaskLocations. import scala.concurrent.duration._ - assert(executorManager.expectMsgPF(5.seconds) { - case BroadCast(taskLocationsReady) => taskLocationsReady.isInstanceOf[TaskLocationsReady] - }) + executorManager.expectMsgType[BroadCast](5.seconds) match { + case BroadCast(taskLocationsReady) => + taskLocationsReady shouldBe a [TaskLocationsReady] + } // Step10: Executor confirm it has received TaskLocationsReceived(version, executorId) taskManager.tell(TaskLocationsReceived(dag.version, executorId), executor.ref) @@ -241,9 +242,10 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { // Step12: start all tasks import scala.concurrent.duration._ - assert(executorManager.expectMsgPF(5.seconds) { - case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[StartAllTasks] - }) + executorManager.expectMsgType[BroadCast](5.seconds) match { + case BroadCast(startAllTasks) => + startAllTasks shouldBe a [StartAllTasks] + } // Step13, Tell ExecutorManager the updated usage status of executors. executorManager.expectMsgType[ExecutorResourceUsageSummary]
