http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala index a3f2fc5..df85017 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,17 +19,21 @@ package io.gearpump.streaming.appmaster import java.lang.management.ManagementFactory +import scala.concurrent.Future import akka.actor._ +import org.slf4j.Logger + import io.gearpump._ import io.gearpump.cluster.ClientToMaster.{GetLastFailure, GetStallingTasks, QueryHistoryMetrics, ShutdownApplication} import io.gearpump.cluster.MasterToAppMaster.{AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge} -import io.gearpump.cluster.MasterToClient.{HistoryMetricsItem, HistoryMetrics, LastFailure} +import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, LastFailure} import io.gearpump.cluster._ +import io.gearpump.cluster.worker.WorkerId import io.gearpump.metrics.Metrics.ReportMetrics import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} import io.gearpump.partitioner.PartitionerDescription -import io.gearpump.streaming.ExecutorToAppMaster.{UnRegisterTask, MessageLoss, RegisterExecutor, RegisterTask} +import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask} import io.gearpump.streaming._ import io.gearpump.streaming.appmaster.AppMaster._ import io.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, LatestDAG, ReplaceProcessor} @@ -42,30 +46,27 @@ import io.gearpump.streaming.util.ActorPathUtil import io.gearpump.util.Constants.{APPMASTER_DEFAULT_EXECUTOR_ID, _} import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig import io.gearpump.util._ -import org.slf4j.Logger - -import scala.concurrent.Future /** * AppMaster is the head of a streaming application. * * It contains: - * 1. ExecutorManager to manage all executors. - * 2. TaskManager to manage all tasks, - * 3. ClockService to track the global clock for this streaming application. - * 4. Scheduler to decide which a task should be scheduled to. - * + * 1. ExecutorManager to manage all executors. + * 2. TaskManager to manage all tasks, + * 3. ClockService to track the global clock for this streaming application. + * 4. Scheduler to decide which a task should be scheduled to. */ -class AppMaster(appContext : AppMasterContext, app : AppDescription) extends ApplicationMaster { +class AppMaster(appContext: AppMasterContext, app: AppDescription) extends ApplicationMaster { import app.userConfig import appContext.{appId, masterProxy, username} - implicit val actorSystem = context.system - implicit val timeOut = FUTURE_TIMEOUT + private implicit val actorSystem = context.system + private implicit val timeOut = FUTURE_TIMEOUT + import akka.pattern.ask - implicit val dispatcher = context.dispatcher + private implicit val dispatcher = context.dispatcher - val startTime: TimeStamp = System.currentTimeMillis() + private val startTime: TimeStamp = System.currentTimeMillis() private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx") @@ -83,11 +84,12 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap private var lastFailure = LastFailure(0L, null) private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID, - self.path.toString, Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active") + self.path.toString, + Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active") private val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig) - val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) + private val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) private val userDir = System.getProperty("user.dir") private val logFile = LogUtil.applicationLogDir(actorSystem.settings.config) @@ -104,12 +106,15 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap ) private val historyMetricsService = if (metricsEnabled) { - // register jvm metrics - Metrics(context.system).register(new JvmMetricsSet(s"app${appId}.executor${APPMASTER_DEFAULT_EXECUTOR_ID}")) + // Registers jvm metrics + Metrics(context.system).register(new JvmMetricsSet( + s"app${appId}.executor${APPMASTER_DEFAULT_EXECUTOR_ID}")) - val historyMetricsService = context.actorOf(Props(new HistoryMetricsService(s"app$appId", getHistoryMetricsConfig))) + val historyMetricsService = context.actorOf(Props(new HistoryMetricsService( + s"app$appId", getHistoryMetricsConfig))) - val metricsReportService = context.actorOf(Props(new MetricsReporterService(Metrics(context.system)))) + val metricsReportService = context.actorOf(Props( + new MetricsReporterService(Metrics(context.system)))) historyMetricsService.tell(ReportMetrics, metricsReportService) Some(historyMetricsService) @@ -129,13 +134,15 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap jarScheduler, executorManager, clockService.get, self, app.name)))) } - override def receive : Receive = - taskMessageHandler orElse + override def receive: Receive = { + taskMessageHandler orElse executorMessageHandler orElse recover orElse appMasterService orElse ActorUtil.defaultMsgHandler(self) + } + /** Handles messages from Tasks */ def taskMessageHandler: Receive = { case clock: ClockEvent => taskManager.foreach(_ forward clock) @@ -143,7 +150,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap taskManager.foreach(_ forward register) case unRegister: UnRegisterTask => taskManager.foreach(_ forward unRegister) - // check whether this processor dead, if it is, then we should remove it from clockService. + // Checks whether this processor dead, if it is, then we should remove it from clockService. clockService.foreach(_ forward CheckProcessorDeath(unRegister.taskId.processorId)) case replay: ReplayFromTimestampWindowTrailingEdge => taskManager.foreach(_ forward replay) @@ -163,6 +170,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap clockService.foreach(_ forward GetCheckpointClock) } + /** Handles messages from Executors */ def executorMessageHandler: Receive = { case register: RegisterExecutor => executorManager forward register @@ -170,6 +178,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap historyMetricsService.foreach(_ forward ReportMetrics) } + /** Handles messages from AppMaster */ def appMasterService: Receive = { case appMasterDataDetailRequest: AppMasterDataDetailRequest => LOG.debug(s"AppMaster got AppMasterDataDetailRequest for $appId ") @@ -179,14 +188,17 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap val taskFuture = getTaskList val dagFuture = getDAG - val appMasterDataDetail = for {executors <- executorsFuture + val appMasterDataDetail = for { + executors <- executorsFuture clock <- clockFuture tasks <- taskFuture dag <- dagFuture } yield { val graph = dag.graph - val executorToTasks = tasks.tasks.groupBy(_._2).mapValues {_.keys.toList} + val executorToTasks = tasks.tasks.groupBy(_._2).mapValues { + _.keys.toList + } val processors = dag.processors.map { kv => val processor = kv._2 @@ -195,7 +207,8 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap (kv._1, TaskCount(kv._2.count(_.processorId == id))) }.filter(_._2.count != 0) (id, - ProcessorSummary(id, taskClass, parallelism, description, taskConf, life, tasks.keys.toList, tasks)) + ProcessorSummary(id, taskClass, parallelism, description, taskConf, life, + tasks.keys.toList, tasks)) } StreamAppMasterSummary( @@ -211,7 +224,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap logFile = logFile.getAbsolutePath, processors = processors, processorLevels = graph.vertexHierarchyLevelMap(), - dag = graph.mapEdge {(node1, edge, node2) => + dag = graph.mapEdge { (node1, edge, node2) => edge.partitionerFactory.name }, executors = executors, @@ -221,16 +234,16 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap val client = sender() - appMasterDataDetail.map{appData => + appMasterDataDetail.map { appData => client ! appData } -// TODO: WebSocket is buggy and disabled. -// case appMasterMetricsRequest: AppMasterMetricsRequest => -// val client = sender() -// actorSystem.eventStream.subscribe(client, classOf[MetricType]) + // TODO: WebSocket is buggy and disabled. + // case appMasterMetricsRequest: AppMasterMetricsRequest => + // val client = sender() + // actorSystem.eventStream.subscribe(client, classOf[MetricType]) case query: QueryHistoryMetrics => if (historyMetricsService.isEmpty) { - // return empty metrics so that we don't hang the UI + // Returns empty metrics so that we don't hang the UI sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem]) } else { historyMetricsService.get forward query @@ -241,34 +254,37 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap dagManager forward replaceDAG case GetLastFailure(_) => sender ! lastFailure - case get@ GetExecutorSummary(executorId) => + case get@GetExecutorSummary(executorId) => val client = sender if (executorId == APPMASTER_DEFAULT_EXECUTOR_ID) { client ! appMasterExecutorSummary } else { - ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo).map { map => - map.get(executorId).foreach { executor => - executor.executor.tell(get, client) + ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo) + .map { map => + map.get(executorId).foreach { executor => + executor.executor.tell(get, client) + } } - } } - case query@ QueryExecutorConfig(executorId) => + case query@QueryExecutorConfig(executorId) => val client = sender if (executorId == -1) { val systemConfig = context.system.settings.config sender ! ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) } else { - ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo).map { map => - map.get(executorId).foreach { executor => - executor.executor.tell(query, client) + ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo) + .map { map => + map.get(executorId).foreach { executor => + executor.executor.tell(query, client) + } } - } } - } + } + /** Error handling */ def recover: Receive = { case FailedToRecover(errorMsg) => - if(context.children.toList.contains(sender())){ + if (context.children.toList.contains(sender())) { LOG.error(errorMsg) masterProxy ! ShutdownApplication(appId) } @@ -288,14 +304,15 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap } private def executorBrief: Future[List[ExecutorBrief]] = { - ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo).map { infos => - infos.values.map { info => - ExecutorBrief(info.executorId, - info.executor.path.toSerializationFormat, - info.worker.workerId, - "active") - }.toList :+ appMasterBrief - } + ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo) + .map { infos => + infos.values.map { info => + ExecutorBrief(info.executorId, + info.executor.path.toSerializationFormat, + info.worker.workerId, + "active") + }.toList :+ appMasterBrief + } } private def getTaskList: Future[TaskList] = { @@ -312,10 +329,11 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap } private def getUpdatedDAG(): DAG = { - val dag = DAG(userConfig.getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get) - val updated = dag.processors.map{ idAndProcessor => + val dag = DAG(userConfig.getValue[Graph[ProcessorDescription, + PartitionerDescription]](StreamApplication.DAG).get) + val updated = dag.processors.map { idAndProcessor => val (id, oldProcessor) = idAndProcessor - val newProcessor = if(oldProcessor.jar == null) { + val newProcessor = if (oldProcessor.jar == null) { oldProcessor.copy(jar = appContext.appJar.getOrElse(null)) } else { oldProcessor @@ -327,14 +345,18 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap } object AppMaster { + + /** Master node doesn't return resource in time */ case object AllocateResourceTimeOut + /** Query task ActorRef by providing the taskId */ case class LookupTaskActorRef(taskId: TaskId) case class TaskActorRef(task: ActorRef) class ServiceNotAvailableException(reason: String) extends Exception(reason) - case class ExecutorBrief(executorId: ExecutorId, executor: String, workerId: WorkerId, status: String) + case class ExecutorBrief( + executorId: ExecutorId, executor: String, workerId: WorkerId, status: String) } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala index 67fd592..bd18bdc 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,45 +21,47 @@ package io.gearpump.streaming.appmaster import java.util import java.util.Date import java.util.concurrent.TimeUnit +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.language.implicitConversions import akka.actor.{Actor, Cancellable, Stash} +import org.slf4j.Logger + +import io.gearpump.TimeStamp +import io.gearpump.cluster.ClientToMaster.GetStallingTasks import io.gearpump.google.common.primitives.Longs -import io.gearpump.streaming.task._ +import io.gearpump.streaming.AppMasterToMaster.StallingTasks import io.gearpump.streaming._ +import io.gearpump.streaming.appmaster.ClockService.HealthChecker.ClockValue +import io.gearpump.streaming.appmaster.ClockService._ import io.gearpump.streaming.storage.AppDataStore -import io.gearpump.TimeStamp -import io.gearpump.cluster.ClientToMaster.GetStallingTasks -import AppMasterToMaster.StallingTasks -import ClockService.HealthChecker.ClockValue -import ClockService._ +import io.gearpump.streaming.task._ import io.gearpump.util.LogUtil -import org.slf4j.Logger - -import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration -import scala.language.implicitConversions /** - * The clockService will maintain a global view of message timestamp in the application + * Maintains a global view of message timestamp in the application */ -class ClockService(private var dag : DAG, store: AppDataStore) extends Actor with Stash { +class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with Stash { private val LOG: Logger = LogUtil.getLogger(getClass) import context.dispatcher private val healthChecker = new HealthChecker(stallingThresholdSeconds = 60) - private var healthCheckScheduler : Cancellable = null - private var snapshotScheduler : Cancellable = null + private var healthCheckScheduler: Cancellable = null + private var snapshotScheduler: Cancellable = null - override def receive = null + override def receive: Receive = null - override def preStart() : Unit = { + override def preStart(): Unit = { LOG.info("Initializing Clock service, get snapshotted StartClock ....") store.get(START_CLOCK).asInstanceOf[Future[TimeStamp]].map { clock => val startClock = Option(clock).getOrElse(0L) minCheckpointClock = Some(startClock) + // Recover the application by restarting from last persisted startClock. + // Only messge after startClock will be replayed. self ! StoredStartClock(startClock) LOG.info(s"Start Clock Retrieved, starting ClockService, startClock: $startClock") } @@ -67,12 +69,15 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit context.become(waitForStartClock) } - override def postStop() : Unit = { + override def postStop(): Unit = { Option(healthCheckScheduler).map(_.cancel) Option(snapshotScheduler).map(_.cancel) } - var clocks = Map.empty[ProcessorId, ProcessorClock] + // Keep track of clock value of all processors. + private var clocks = Map.empty[ProcessorId, ProcessorClock] + + // Each process can have multiple upstream processors. This keep track of the upstream clocks. private var upstreamClocks = Map.empty[ProcessorId, Array[ProcessorClock]] // We use Array instead of List for Performance consideration @@ -106,12 +111,12 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = { this.clocks = dag.processors.filter(startClock < _._2.life.death). map { pair => - val (processorId, processor) = pair - val parallelism = processor.parallelism - val clock = new ProcessorClock(processorId, processor.life, parallelism) - clock.init(startClock) - (processorId, clock) - } + val (processorId, processor) = pair + val parallelism = processor.parallelism + val clock = new ProcessorClock(processorId, processor.life, parallelism) + clock.init(startClock) + (processorId, clock) + } this.upstreamClocks = clocks.map { pair => val (processorId, processor) = pair @@ -150,7 +155,7 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit (processorId, upstreamClocks.toArray) } - // init the clock of all processors. + // Inits the clock of all processors. newClocks.map { pair => val (processorId, processorClock) = pair val upstreamClock = getUpStreamMinClock(processorId) @@ -174,11 +179,12 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit import context.dispatcher - //period report current clock - healthCheckScheduler = context.system.scheduler.schedule(new FiniteDuration(5, TimeUnit.SECONDS), + // Period report current clock + healthCheckScheduler = context.system.scheduler.schedule( + new FiniteDuration(5, TimeUnit.SECONDS), new FiniteDuration(60, TimeUnit.SECONDS), self, HealthCheck) - //period snpashot latest min startclock to external storage + // Period snpashot latest min startclock to external storage snapshotScheduler = context.system.scheduler.schedule(new FiniteDuration(5, TimeUnit.SECONDS), new FiniteDuration(5, TimeUnit.SECONDS), self, SnapshotStartClock) @@ -206,7 +212,7 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit case GetUpstreamMinClock(task) => sender ! UpstreamMinClock(getUpStreamMinClock(task.processorId)) - case update@ UpdateClock(task, clock) => + case update@UpdateClock(task, clock) => val upstreamMinClock = getUpStreamMinClock(task.processorId) val processorClock = clocks.get(task.processorId) @@ -233,7 +239,8 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit LOG.info(s"Removing $processorId from clock service...") removeProcessor(processorId) } else { - LOG.info(s"Unsuccessfully in removing $processorId from clock service..., min: ${processorClock.get.min}, life: $life") + LOG.info(s"Unsuccessfully in removing $processorId from clock service...," + + s" min: ${processorClock.get.min}, life: $life") } } case HealthCheck => @@ -253,15 +260,15 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit case ChangeToNewDAG(dag) => if (dag.version > this.dag.version) { - // transit to a new dag version + // Transits to a new dag version this.dag = dag dynamicDAG(dag, getStartClock) } else { - // restart current dag. + // Restarts current dag. recoverDag(dag, getStartClock) } - LOG.info(s"Change to new DAG(dag = ${dag.version}), send back ChangeToNewDAGSuccess") - sender ! ChangeToNewDAGSuccess(clocks.map{ pair => + LOG.info(s"Change to new DAG(dag = ${dag.version}), send back ChangeToNewDAGSuccess") + sender ! ChangeToNewDAGSuccess(clocks.map { pair => val (id, clock) = pair (id, clock.min) }) @@ -279,7 +286,7 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit upstreamClocks = upstreamClocks - processorId - // remove dead processor from checkpoints. + // Removes dead processor from checkpoints. checkpointClocks = checkpointClocks.filter { kv => val (taskId, processor) = kv taskId.processorId != processorId @@ -290,12 +297,13 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit ProcessorClocks.minClock(processorClocks) } - def selfCheck() : Unit = { + def selfCheck(): Unit = { val minTimestamp = minClock if (Long.MaxValue == minTimestamp) { processorClocks.foreach { clock => - LOG.info(s"Processor ${clock.processorId} Clock: min: ${clock.min}, taskClocks: "+ clock.taskClocks.mkString(",")) + LOG.info(s"Processor ${clock.processorId} Clock: min: ${clock.min}, " + + s"taskClocks: " + clock.taskClocks.mkString(",")) } } @@ -306,7 +314,7 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit minCheckpointClock.getOrElse(minClock) } - private def snapshotStartClock() : Unit = { + private def snapshotStartClock(): Unit = { store.put(START_CLOCK, getStartClock) } @@ -329,10 +337,11 @@ object ClockService { case object HealthCheck class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallism: Int, - private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) { - + private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) { - def copy(life: LifeTime): ProcessorClock = new ProcessorClock(processorId, life, parallism, _min, _taskClocks) + def copy(life: LifeTime): ProcessorClock = { + new ProcessorClock(processorId, life, parallism, _min, _taskClocks) + } def min: TimeStamp = _min def taskClocks: Array[TimeStamp] = _taskClocks @@ -362,17 +371,22 @@ object ClockService { private val LOG: Logger = LogUtil.getLogger(getClass) private var minClock: ClockValue = null - private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000 // 60 seconds + private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000 + // 60 seconds private var stallingTasks = Array.empty[TaskId] - def check(currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock], dag: DAG, now: TimeStamp): Unit = { + /** Check for stalling tasks */ + def check( + currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock], + dag: DAG, now: TimeStamp): Unit = { var isClockStalling = false if (null == minClock || currentMinClock > minClock.appClock) { minClock = ClockValue(systemClock = now, appClock = currentMinClock) } else { - //clock not advancing + // Clock not advancing if (now > minClock.systemClock + stallingThresholdMilliseconds) { - LOG.warn(s"Clock has not advanced for ${(now - minClock.systemClock)/1000} seconds since ${minClock.prettyPrint}...") + LOG.warn(s"Clock has not advanced for ${(now - minClock.systemClock) / 1000} seconds " + + s"since ${minClock.prettyPrint}...") isClockStalling = true } } @@ -387,7 +401,7 @@ object ClockService { } } - processorId.foreach {processorId => + processorId.foreach { processorId => val processorClock = processorClocks(processorId) val taskClocks = processorClock.taskClocks stallingTasks = taskClocks.zipWithIndex.filter(_._1 == minClock.appClock). @@ -414,10 +428,11 @@ object ClockService { object ProcessorClocks { + // Get the Min clock of all processors def minClock(clock: Array[ProcessorClock]): TimeStamp = { var i = 0 var min = if (clock.length == 0) 0L else clock(0).min - while(i < clock.length) { + while (i < clock.length) { min = Math.min(min, clock(i).min) i += 1 } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala index 7244f95..f18b387 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala @@ -18,44 +18,48 @@ package io.gearpump.streaming.appmaster -import akka.actor.{ActorRef, Actor, Stash} -import io.gearpump.streaming._ -import io.gearpump.streaming.task.Subscriber -import io.gearpump.streaming.storage.AppDataStore -import io.gearpump.cluster.UserConfig -import io.gearpump.partitioner.PartitionerDescription -import DagManager._ -import io.gearpump.util.{LogUtil, Graph} +import scala.concurrent.Future + +import akka.actor.{Actor, ActorRef, Stash} import org.slf4j.Logger -import scala.concurrent.Future +import io.gearpump.cluster.UserConfig +import io.gearpump.partitioner.PartitionerDescription +import io.gearpump.streaming._ +import io.gearpump.streaming.appmaster.DagManager._ +import io.gearpump.streaming.storage.AppDataStore +import io.gearpump.streaming.task.Subscriber +import io.gearpump.util.{Graph, LogUtil} /** - * Will handle dag modification and other stuff related with DAG - */ - + * Handles dag modification and other stuff related with DAG + * + * DagManager maintains multiple version of DAGs. For each version, the DAG is immutable. + * For operations like modifying a processor, it will create a new version of DAG. + */ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: Option[DAG]) - extends Actor with Stash { + extends Actor with Stash { + import context.dispatcher private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) private val NOT_INITIALIZED = -1 private var dags = List.empty[DAG] private var maxProcessorId = -1 - implicit val system = context.system + private implicit val system = context.system private var watchers = List.empty[ActorRef] override def receive: Receive = null - override def preStart() : Unit = { + override def preStart(): Unit = { LOG.info("Initializing Dag Service, get stored Dag ....") store.get(StreamApplication.DAG).asInstanceOf[Future[DAG]].map { storedDag => if (storedDag != null) { dags :+= storedDag } else { dags :+= dag.getOrElse(DAG(userConfig.getValue[Graph[ProcessorDescription, - PartitionerDescription]](StreamApplication.DAG).get)) + PartitionerDescription]](StreamApplication.DAG).get)) } maxProcessorId = { val keys = dags.head.processors.keys @@ -91,20 +95,25 @@ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: O def dagService: Receive = { case GetLatestDAG => + // Get the latest version of DAG. sender ! LatestDAG(dags.last) case GetTaskLaunchData(version, processorId, context) => + // Task information like Processor class, downstream subscriber processors and etc. dags.find(_.version == version).foreach { dag => LOG.info(s"Get task launcher data for processor: $processorId, dagVersion: $version") sender ! taskLaunchData(dag, processorId, context) } case ReplaceProcessor(oldProcessorId, inputNewProcessor) => + // Replace a processor with new implementation. The upstream processors and downstream + // processors are NOT changed. var newProcessor = inputNewProcessor.copy(id = nextProcessorId) - if (inputNewProcessor.jar == null){ + if (inputNewProcessor.jar == null) { val oldJar = dags.last.processors.get(oldProcessorId).get newProcessor = newProcessor.copy(jar = oldJar.jar) } if (dags.length > 1) { - sender ! DAGOperationFailed("We are in the process of handling previous dynamic dag change") + sender ! DAGOperationFailed( + "We are in the process of handling previous dynamic dag change") } else { val oldDAG = dags.last val newVersion = oldDAG.version + 1 @@ -118,21 +127,25 @@ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: O } case WatchChange(watcher) => + // Checks whether there are modifications for this DAG. if (!this.watchers.contains(watcher)) { this.watchers :+= watcher } case NewDAGDeployed(dagVersion) => - // means the new DAG version has been successfully deployed. - // remove obsolete versions. + // Means dynamic Dag transition completed, and the new DAG version has been successfully + // deployed. The obsolete dag versions will be removed. if (dagVersion != NOT_INITIALIZED) { dags = dags.filter(_.version == dagVersion) store.put(StreamApplication.DAG, dags.last) } } - private def replaceDAG(dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int): DAG = { - val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth, newProcessor.life.birth) + private def replaceDAG( + dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int) + : DAG = { + val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth, + newProcessor.life.birth) val newProcessorMap = dag.processors ++ Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife), @@ -153,11 +166,13 @@ object DagManager { case class LatestDAG(dag: DAG) case class GetTaskLaunchData(dagVersion: Int, processorId: Int, context: AnyRef = null) - case class TaskLaunchData(processorDescription : ProcessorDescription, subscribers: List[Subscriber], context: AnyRef = null) + case class TaskLaunchData(processorDescription : ProcessorDescription, + subscribers: List[Subscriber], context: AnyRef = null) sealed trait DAGOperation - case class ReplaceProcessor(oldProcessorId: ProcessorId, newProcessorDescription: ProcessorDescription) extends DAGOperation + case class ReplaceProcessor(oldProcessorId: ProcessorId, + newProcessorDescription: ProcessorDescription) extends DAGOperation sealed trait DAGOperationResult case object DAGOperationSuccess extends DAGOperationResult http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala index ebe6c11..ab99af5 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,34 +18,32 @@ package io.gearpump.streaming.appmaster +import scala.concurrent.duration._ +import scala.util.{Failure, Try} + import akka.actor.SupervisorStrategy.Stop import akka.actor._ import akka.remote.RemoteScope import com.typesafe.config.Config -import io.gearpump.WorkerId +import org.apache.commons.lang.exception.ExceptionUtils + import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems} import io.gearpump.cluster.appmaster.WorkerInfo import io.gearpump.cluster.scheduler.{Resource, ResourceRequest} +import io.gearpump.cluster.worker.WorkerId import io.gearpump.cluster.{AppJar, AppMasterContext, ExecutorContext, UserConfig} import io.gearpump.streaming.ExecutorId import io.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor import io.gearpump.streaming.appmaster.ExecutorManager._ import io.gearpump.streaming.executor.Executor import io.gearpump.util.{LogUtil, Util} -import org.apache.commons.lang.exception.ExceptionUtils - -import scala.concurrent.duration._ -import scala.util.Try /** - * ExecutorManager manage all executors. + * ExecutorManager manage the start and stop of all executors. * * ExecutorManager will launch Executor when asked. It hide the details like starting - * a new ExecutorSystem from user. - * - * Please use ExecutorManager.props() to construct this actor - * + * a new ExecutorSystem from user. Please use ExecutorManager.props() to construct this actor */ private[appmaster] class ExecutorManager( userConfig: UserConfig, @@ -60,10 +58,11 @@ private[appmaster] class ExecutorManager( import appContext.{appId, masterProxy, username} private var taskManager: ActorRef = null - implicit val actorSystem = context.system + + private implicit val actorSystem = context.system private val systemConfig = context.system.settings.config - private var executors = Map.empty[Int, ExecutorInfo] + private var executors = Map.empty[Int, ExecutorInfo] def receive: Receive = waitForTaskManager @@ -73,18 +72,26 @@ private[appmaster] class ExecutorManager( context.become(service orElse terminationWatch) } - override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { + // If something wrong on executor, ExecutorManager will stop the current executor, + // and wait for AppMaster to start a new executor. + override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, + withinTimeRange = 1.minute) { case ex: Throwable => val executorId = Try(sender.path.name.toInt) executorId match { case scala.util.Success(id) => { executors -= id - LOG.error(s"Executor $id throws exception, stop it...\n" + ExceptionUtils.getStackTrace(ex)) + LOG.error(s"Executor $id throws exception, stop it...\n" + + ExceptionUtils.getStackTrace(ex)) + } + case Failure(ex) => { + LOG.error(s"Sender ${sender.path} is dead, but seems it is not an executor...") } } Stop } + // Responds to outside queries def service: Receive = { case StartExecutors(resources, jar) => masterProxy ! StartExecutorSystems(resources, getExecutorJvmConfig(Some(jar))) @@ -92,30 +99,31 @@ private[appmaster] class ExecutorManager( import executorSystem.{address, executorSystemId, resource => executorResource, worker} val executorId = executorSystemId - val executorContext = ExecutorContext(executorId, worker, appId, appName, appMaster = context.parent, executorResource) + val executorContext = ExecutorContext(executorId, worker, appId, appName, + appMaster = context.parent, executorResource) executors += executorId -> ExecutorInfo(executorId, null, worker, boundedJar) - //start executor - val executor = context.actorOf(executorFactory(executorContext, userConfig, address, executorId), - executorId.toString) + // Starts executor + val executor = context.actorOf(executorFactory(executorContext, userConfig, + address, executorId), executorId.toString) executorSystem.bindLifeCycleWith(executor) case StartExecutorSystemTimeout => taskManager ! StartExecutorsTimeOut case RegisterExecutor(executor, executorId, resource, worker) => LOG.info(s"executor $executorId has been launched") - //watch for executor termination + // Watches for executor termination context.watch(executor) val executorInfo = executors.get(executorId).get executors += executorId -> executorInfo.copy(executor = executor) taskManager ! ExecutorStarted(executorId, resource, worker.workerId, executorInfo.boundedJar) - // broadcast message to all executors + // Broadcasts message to all executors case BroadCast(msg) => LOG.info(s"Broadcast ${msg.getClass.getSimpleName} to all executors") - context.children.foreach(_ forward msg) + context.children.foreach(_ forward msg) - // unicast message to single executor + // Unicasts message to single executor case UniCast(executorId, msg) => LOG.info(s"Unicast ${msg.getClass.getSimpleName} to executor $executorId") val executor = executors.get(executorId) @@ -124,13 +132,14 @@ private[appmaster] class ExecutorManager( case GetExecutorInfo => sender ! executors - // update resource usage, so that reclaim unused resource. + // Tells Executor manager resources that are occupied. The Executor Manager can use this + // information to tell worker to reclaim un-used resources case ExecutorResourceUsageSummary(resources) => executors.foreach { pair => val (executorId, executor) = pair val resource = resources.get(executorId) val worker = executor.worker.ref - // notify the worker the actual resource used by this application. + // Notifies the worker the actual resource used by this application. resource match { case Some(resource) => worker ! ChangeExecutorResource(appId, executorId, resource) @@ -138,9 +147,9 @@ private[appmaster] class ExecutorManager( worker ! ChangeExecutorResource(appId, executorId, Resource(0)) } } - } + } - def terminationWatch : Receive = { + def terminationWatch: Receive = { case Terminated(actor) => val executorId = Try(actor.path.name.toInt) executorId match { @@ -149,14 +158,17 @@ private[appmaster] class ExecutorManager( LOG.error(s"Executor $id is down") taskManager ! ExecutorStopped(id) } - case scala.util.Failure(ex) => LOG.error(s"failed to get the executor Id from path string ${actor.path}" , ex) + case scala.util.Failure(ex) => + LOG.error(s"failed to get the executor Id from path string ${actor.path}", ex) } } private def getExecutorJvmConfig(jar: Option[AppJar]): ExecutorSystemJvmConfig = { val executorAkkaConfig = clusterConfig val jvmSetting = Util.resolveJvmSetting(executorAkkaConfig.withFallback(systemConfig)).executor - ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, jar, username, executorAkkaConfig) + + ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, jar, + username, executorAkkaConfig) } } @@ -168,26 +180,30 @@ private[appmaster] object ExecutorManager { case object GetExecutorInfo - case class ExecutorStarted(executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar]) + case class ExecutorStarted( + executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar]) case class ExecutorStopped(executorId: Int) case class SetTaskManager(taskManager: ActorRef) case object StartExecutorsTimeOut - def props(userConfig: UserConfig, appContext: AppMasterContext, clusterConfig: Config, appName: String): Props = { + def props( + userConfig: UserConfig, appContext: AppMasterContext, clusterConfig: Config, appName: String) + : Props = { val executorFactory = - (executorContext: ExecutorContext, - userConfig: UserConfig, - address: Address, - executorId: ExecutorId) => - Props(classOf[Executor], executorContext, userConfig) - .withDeploy(Deploy(scope = RemoteScope(address))) + (executorContext: ExecutorContext, + userConfig: UserConfig, + address: Address, + executorId: ExecutorId) => + Props(classOf[Executor], executorContext, userConfig) + .withDeploy(Deploy(scope = RemoteScope(address))) Props(new ExecutorManager(userConfig, appContext, executorFactory, clusterConfig, appName)) } case class ExecutorResourceUsageSummary(resources: Map[ExecutorId, Resource]) - case class ExecutorInfo(executorId: ExecutorId, executor: ActorRef, worker: WorkerInfo, boundedJar: Option[AppJar]) + case class ExecutorInfo( + executorId: ExecutorId, executor: ActorRef, worker: WorkerInfo, boundedJar: Option[AppJar]) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala index e769a0d..a2a9c74 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,51 +17,69 @@ */ package io.gearpump.streaming.appmaster +import scala.concurrent.Future + import akka.actor._ +import akka.pattern.ask import com.typesafe.config.Config -import io.gearpump.{WorkerId, TimeStamp} -import io.gearpump.streaming.task.{StartClock, TaskId} -import io.gearpump.streaming.{ProcessorDescription, DAG} + +import io.gearpump.TimeStamp import io.gearpump.cluster.AppJar import io.gearpump.cluster.scheduler.{Resource, ResourceRequest} +import io.gearpump.cluster.worker.WorkerId import io.gearpump.partitioner.PartitionerDescription import io.gearpump.streaming.appmaster.JarScheduler._ -import io.gearpump.util.{LogUtil, Constants, Graph} -import akka.pattern.ask -import scala.concurrent.Future +import io.gearpump.streaming.task.TaskId +import io.gearpump.streaming.{DAG, ProcessorDescription} +import io.gearpump.util.{Constants, Graph, LogUtil} /** + * Different processors of the stream application can use different jars. JarScheduler is the + * scheduler for different jars. * - * With JarScheduler, we allows a DAG to be partitioned into several - * parts, with each part use its own jar file. + * For a DAG of multiple processors, each processor can have its own jar. Tasks of same jar + * is scheduled by TaskScheduler, and TaskSchedulers are scheduled by JarScheduler. * + * In runtime, the implementation is delegated to actor JarSchedulerImpl */ -class JarScheduler(appId : Int, appName: String, config: Config, factory: ActorRefFactory) { +class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRefFactory) { private val actor: ActorRef = factory.actorOf(Props(new JarSchedulerImpl(appId, appName, config))) private implicit val dispatcher = factory.dispatcher private implicit val timeout = Constants.FUTURE_TIMEOUT + /** Set the current DAG version active */ def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = { actor ! TransitToNewDag - startClock.map {start => + startClock.map { start => actor ! NewDag(dag, start) } } - def getRequestDetails(): Future[Array[ResourceRequestDetail]] = { - (actor ? GetRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]] + /** AppMaster ask JarScheduler about how many resource it wants */ + def getResourceRequestDetails(): Future[Array[ResourceRequestDetail]] = { + (actor ? GetResourceRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]] } - def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource): Future[List[TaskId]] = { - (actor ? ScheduleTask(appJar, workerId, executorId, resource)).asInstanceOf[Future[List[TaskId]]] + /** + * AppMaster has resource allocated, and ask the jar scheduler to schedule tasks + * for this executor. + */ + def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource) + : Future[List[TaskId]] = { + (actor ? ScheduleTask(appJar, workerId, executorId, resource)) + .asInstanceOf[Future[List[TaskId]]] } + /** + * Some executor JVM process is dead. AppMaster asks jar scheduler to re-schedule the impacted + * tasks. + */ def executorFailed(executorId: Int): Future[Option[ResourceRequestDetail]] = { (actor ? ExecutorFailed(executorId)).asInstanceOf[Future[Option[ResourceRequestDetail]]] } } -object JarScheduler{ +object JarScheduler { case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest]) @@ -69,14 +87,24 @@ object JarScheduler{ case object TransitToNewDag - case object GetRequestDetails + case object GetResourceRequestDetails + /** + * Schedule tasks for one appJar. + * + * @param appJar Application jar. + * @param workerId Worker machine Id. + * @param executorId Executor Id. + * @param resource Slots that are available. + */ case class ScheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource) + /** Some executor JVM is dead, try to recover tasks that are located on failed executor */ case class ExecutorFailed(executorId: Int) - class JarSchedulerImpl(appId : Int, appName: String, config: Config) extends Actor with Stash { + class JarSchedulerImpl(appId: Int, appName: String, config: Config) extends Actor with Stash { + // Each TaskScheduler maps to a jar. private var taskSchedulers = Map.empty[AppJar, TaskScheduler] private val LOG = LogUtil.getLogger(getClass) @@ -84,26 +112,29 @@ object JarScheduler{ def receive: Receive = waitForNewDag def waitForNewDag: Receive = { - case TransitToNewDag => // continue current state + case TransitToNewDag => // Continue current state case NewDag(dag, startTime) => LOG.info(s"Init JarScheduler, dag version: ${dag.version}, startTime: $startTime") val processors = dag.processors.values.groupBy(_.jar) + taskSchedulers = processors.map { jarAndProcessors => val (jar, processors) = jarAndProcessors - //Construct the sub DAG - val graph = Graph.empty[ProcessorDescription, PartitionerDescription] - processors.foreach{processor => + // Construct the sub DAG, each sub DAG maps to a separate jar. + val subGraph = Graph.empty[ProcessorDescription, PartitionerDescription] + processors.foreach { processor => if (startTime < processor.life.death) { - graph.addVertex(processor) + subGraph.addVertex(processor) } } - val subDag = DAG(graph) - val taskScheduler = taskSchedulers.getOrElse(jar, new TaskSchedulerImpl(appId, appName, config)) + val subDagForSingleJar = DAG(subGraph) - LOG.info(s"Set DAG for TaskScheduler, count: " + subDag.processors.size) - taskScheduler.setDAG(subDag) + val taskScheduler = taskSchedulers + .getOrElse(jar, new TaskSchedulerImpl(appId, appName, config)) + + LOG.info(s"Set DAG for TaskScheduler, count: " + subDagForSingleJar.processors.size) + taskScheduler.setDAG(subDagForSingleJar) jar -> taskScheduler } unstashAll() @@ -113,15 +144,20 @@ object JarScheduler{ } def ready: Receive = { + // Notifies there is a new DAG coming. case TransitToNewDag => context.become(waitForNewDag) - case GetRequestDetails => + + case GetResourceRequestDetails => + + // Asks each TaskScheduler (Each for one jar) the resource requests. val result: Array[ResourceRequestDetail] = taskSchedulers.map { jarAndScheduler => val (jar, scheduler) = jarAndScheduler ResourceRequestDetail(jar, scheduler.getResourceRequests()) }.toArray LOG.info(s"GetRequestDetails " + result.mkString(";")) sender ! result + case ScheduleTask(appJar, workerId, executorId, resource) => val result: List[TaskId] = taskSchedulers.get(appJar).map { scheduler => scheduler.schedule(workerId, executorId, resource) @@ -130,9 +166,9 @@ object JarScheduler{ sender ! result case ExecutorFailed(executorId) => val result: Option[ResourceRequestDetail] = taskSchedulers. - find(_._2.scheduledTasks(executorId).nonEmpty).map{ jarAndScheduler => - ResourceRequestDetail(jarAndScheduler._1, jarAndScheduler._2.executorFailed(executorId)) - } + find(_._2.scheduledTasks(executorId).nonEmpty).map { jarAndScheduler => + ResourceRequestDetail(jarAndScheduler._1, jarAndScheduler._2.executorFailed(executorId)) + } LOG.info(s"ExecutorFailed " + result.mkString(";")) sender ! result } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala index cdcd3f6..3b2c8bf 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,15 +18,16 @@ package io.gearpump.streaming.appmaster -import io.gearpump.streaming.{ExecutorId, ProcessorId, LifeTime} import io.gearpump._ import io.gearpump.cluster.AppMasterToMaster.AppMasterSummary -import io.gearpump.cluster.{UserConfig, MasterToAppMaster} import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus -import AppMaster.ExecutorBrief +import io.gearpump.cluster.{MasterToAppMaster, UserConfig} +import io.gearpump.streaming.appmaster.AppMaster.ExecutorBrief +import io.gearpump.streaming.{ExecutorId, LifeTime, ProcessorId} import io.gearpump.util.Graph import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig +/** Stream application summary, used for REST API */ case class StreamAppMasterSummary( appType: String = "streaming", appId: Int, @@ -42,7 +43,7 @@ case class StreamAppMasterSummary( dag: Graph[ProcessorId, String] = null, executors: List[ExecutorBrief] = null, processors: Map[ProcessorId, ProcessorSummary] = Map.empty[ProcessorId, ProcessorSummary], - // hiearachy level for each processor + // Hiearachy level for each processor processorLevels: Map[ProcessorId, Int] = Map.empty[ProcessorId, Int], historyMetricsConfig: HistoryMetricsConfig = null) extends AppMasterSummary @@ -50,11 +51,11 @@ case class StreamAppMasterSummary( case class TaskCount(count: Int) case class ProcessorSummary( - id: ProcessorId, - taskClass: String, - parallelism : Int, - description: String, - taskConf: UserConfig, - life: LifeTime, - executors: List[ExecutorId], - taskCount: Map[ExecutorId, TaskCount]) \ No newline at end of file + id: ProcessorId, + taskClass: String, + parallelism: Int, + description: String, + taskConf: UserConfig, + life: LifeTime, + executors: List[ExecutorId], + taskCount: Map[ExecutorId, TaskCount]) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala index c456a06..6e7ebc6 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,35 +17,38 @@ */ package io.gearpump.streaming.appmaster -import com.typesafe.config.{ConfigValueFactory, ConfigFactory, ConfigRenderOptions, Config} -import TaskLocator.{Localities, WorkerLocality, NonLocality, Locality} -import io.gearpump.WorkerId +import scala.collection.JavaConverters._ +import scala.util.Try + +import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions, ConfigValueFactory} + +import io.gearpump.cluster.worker.WorkerId import io.gearpump.streaming.Constants +import io.gearpump.streaming.appmaster.TaskLocator.{Localities, Locality, NonLocality, WorkerLocality} import io.gearpump.streaming.task.TaskId -import scala.util.Try -import scala.collection.JavaConverters._ /** * TaskLocator is used to decide which machine one task should run on. * - * User can specify config [[Constants.GEARPUMP_STREAMING_LOCALITIES]] to decide - * to control which machine the task is running on. + * User can specify config [[io.gearpump.streaming.Constants#GEARPUMP_STREAMING_LOCALITIES]] to + * decide to control which machine the task is running on. */ class TaskLocator(appName: String, config: Config) { private val taskLocalities: Map[TaskId, Locality] = loadTaskLocalities(config) - def locateTask(taskId: TaskId) : Locality = { + /** Finds where a task should belongs to */ + def locateTask(taskId: TaskId): Locality = { taskLocalities.getOrElse(taskId, NonLocality) } - private def loadTaskLocalities(config: Config) : Map[TaskId, Locality] = { - import Constants.GEARPUMP_STREAMING_LOCALITIES - Try(config.getConfig(s"$GEARPUMP_STREAMING_LOCALITIES.$appName")).map {appConfig => + private def loadTaskLocalities(config: Config): Map[TaskId, Locality] = { + import io.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES + Try(config.getConfig(s"$GEARPUMP_STREAMING_LOCALITIES.$appName")).map { appConfig => val json = appConfig.root().render(ConfigRenderOptions.concise) Localities.fromJson(json) }.map { localityConfig => import localityConfig.localities - localities.keySet.flatMap {workerId => + localities.keySet.flatMap { workerId => val tasks = localities(workerId) tasks.map((_, WorkerLocality(workerId))) }.toArray.toMap @@ -57,15 +60,20 @@ object TaskLocator { trait Locality + /** Means we require the resource from the specific worker */ case class WorkerLocality(workerId: WorkerId) extends Locality + /** Means no preference on worker */ object NonLocality extends Locality + /** Localities settings. Mapping from workerId to list of taskId */ case class Localities(localities: Map[WorkerId, Array[TaskId]]) object Localities { val pattern = "task_([0-9]+)_([0-9]+)".r + // To avoid polluting the classpath, we do the JSON translation ourself instead of + // introducing JSON library dependencies directly. def fromJson(json: String): Localities = { val localities = ConfigFactory.parseString(json).getAnyRef("localities") .asInstanceOf[java.util.Map[String, String]].asScala.map { pair => @@ -80,8 +88,9 @@ object TaskLocator { } def toJson(localities: Localities): String = { - val map = localities.localities.toList.map {pair => - (WorkerId.render(pair._1), pair._2.map(task => s"task_${task.processorId}_${task.index}").mkString(",")) + val map = localities.localities.toList.map { pair => + (WorkerId.render(pair._1), pair._2.map(task => + s"task_${task.processorId}_${task.index}").mkString(",")) }.toMap.asJava ConfigFactory.empty().withValue("localities", ConfigValueFactory.fromAnyRef(map)). root.render(ConfigRenderOptions.concise()) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala index e1f4872..51f2f9c 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala @@ -18,12 +18,17 @@ package io.gearpump.streaming.appmaster +import scala.concurrent.Future +import scala.concurrent.duration._ + import akka.actor._ import akka.pattern.ask +import org.slf4j.Logger + import io.gearpump.TimeStamp import io.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge import io.gearpump.streaming.AppMasterToExecutor._ -import io.gearpump.streaming.ExecutorToAppMaster.{UnRegisterTask, MessageLoss, RegisterTask} +import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, UnRegisterTask} import io.gearpump.streaming._ import io.gearpump.streaming.appmaster.AppMaster.{AllocateResourceTimeOut, LookupTaskActorRef, TaskActorRef} import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess} @@ -36,31 +41,26 @@ import io.gearpump.streaming.executor.ExecutorRestartPolicy import io.gearpump.streaming.task._ import io.gearpump.streaming.util.ActorPathUtil import io.gearpump.util.{Constants, LogUtil} -import org.slf4j.Logger - -import scala.concurrent.Future -import scala.concurrent.duration._ /** * * TaskManager track all tasks's status. * * It is state machine with three states: - * 1. applicationReady - * 2. recovery - * 3. dynamicDag + * 1. applicationReady + * 2. recovery + * 3. dynamicDag * * When in state applicationReady: - * 1. When there is message-loss or JVM crash, transit to state recovery. - * 2. When user modify the DAG, transit to dynamicDag. + * 1. When there is message-loss or JVM crash, transit to state recovery. + * 2. When user modify the DAG, transit to dynamicDag. * * When in state recovery: - * 1. When all tasks has been recovered, transit to applicationReady. + * 1. When all tasks has been recovered, transit to applicationReady. * * When in state dynamicDag: - * 1. When dyanmic dag transition is complete, transit to applicationReady. - * 2. When there is message loss or JVM crash, transit to state recovery. - * + * 1. When dynamic dag transition is complete, transit to applicationReady. + * 2. When there is message loss or JVM crash, transit to state recovery. */ private[appmaster] class TaskManager( appId: Int, @@ -73,13 +73,16 @@ private[appmaster] class TaskManager( extends Actor { private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) - val systemConfig = context.system.settings.config + private val systemConfig = context.system.settings.config private val ids = new SessionIdFactory() - private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries = 5, withinTimeRange = 20 seconds) - implicit val timeout = Constants.FUTURE_TIMEOUT - implicit val actorSystem = context.system + private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries = 5, + withinTimeRange = 20.seconds) + + private implicit val timeout = Constants.FUTURE_TIMEOUT + private implicit val actorSystem = context.system + import context.dispatcher dagManager ! WatchChange(watcher = self) @@ -91,7 +94,6 @@ private[appmaster] class TaskManager( private var startClock: Future[TimeStamp] = getStartClock - def receive: Receive = applicationReady(DagReadyState.empty) private def onClientQuery(taskRegistry: TaskRegistry): Receive = { @@ -104,14 +106,14 @@ private[appmaster] class TaskManager( val requestor = sender executorId.map { executorId => val taskPath = ActorPathUtil.taskActorPath(appMaster, executorId, taskId) - context.actorSelection(taskPath).resolveOne(3 seconds).map { taskActorRef => + context.actorSelection(taskPath).resolveOne(3.seconds).map { taskActorRef => requestor ! TaskActorRef(taskActorRef) } } } /** - * state applicationReady + * State applicationReady */ def applicationReady(state: DagReadyState): Receive = { executorManager ! state.taskRegistry.usedResource @@ -121,7 +123,7 @@ private[appmaster] class TaskManager( val recoverRegistry = new TaskRegistry(expectedTasks = state.dag.tasks, deadTasks = state.taskRegistry.deadTasks) - + val recoverState = new StartDagState(state.dag, recoverRegistry) val onError: Receive = { @@ -153,8 +155,8 @@ private[appmaster] class TaskManager( val dagDiff = migrate(state.dag, newDag) jarScheduler.setDag(newDag, startClock) - val resourceRequestsDetails = jarScheduler.getRequestDetails() - resourceRequestsDetails.map{ details => + val resourceRequestsDetails = jarScheduler.getResourceRequestDetails() + resourceRequestsDetails.map { details => details.foreach { detail => if (detail.requests.length > 0 && detail.requests.exists(!_.resource.isEmpty)) { executorManager ! StartExecutors(detail.requests, detail.jar) @@ -168,7 +170,8 @@ private[appmaster] class TaskManager( executors.foreach { pair => val (executorId, tasks) = pair modifiedTasks ++= tasks - dagManager ! GetTaskLaunchData(newDag.version, processorId, ChangeTasksOnExecutor(executorId, tasks)) + dagManager ! GetTaskLaunchData(newDag.version, processorId, + ChangeTasksOnExecutor(executorId, tasks)) } } @@ -200,12 +203,13 @@ private[appmaster] class TaskManager( context.become(applicationReady(newState)) } - // recover to same version - onClientQuery(state.taskRegistry) orElse onError orElse onNewDag orElse onUnRegisterTask orElse unHandled("applicationReady") + // Recovers to same version + onClientQuery(state.taskRegistry) orElse onError orElse onNewDag orElse + onUnRegisterTask orElse unHandled("applicationReady") } /** - * state dynamicDag + * State dynamicDag */ def dynamicDag(state: StartDagState, recoverState: StartDagState): Receive = { LOG.info(s"DynamicDag transit to dag version: ${state.dag.version}...") @@ -231,11 +235,13 @@ private[appmaster] class TaskManager( case executor: ExecutorStarted => import executor.{boundedJar, executorId, resource, workerId} val taskIdsFuture = jarScheduler.scheduleTask(boundedJar.get, workerId, executorId, resource) - taskIdsFuture.foreach {taskIds => - LOG.info(s"Executor $executor has been started, start to schedule tasks: ${taskIds.mkString(",")}") + taskIdsFuture.foreach { taskIds => + LOG.info(s"Executor $executor has been started, " + + s"start to schedule tasks: ${taskIds.mkString(",")}") taskIds.groupBy(_.processorId).foreach { pair => val (processorId, tasks) = pair - dagManager ! GetTaskLaunchData(state.dag.version, processorId, StartTasksOnExecutor(executor.executorId, tasks)) + dagManager ! GetTaskLaunchData(state.dag.version, processorId, + StartTasksOnExecutor(executor.executorId, tasks)) } } @@ -250,7 +256,8 @@ private[appmaster] class TaskManager( tasks.foreach(executorRestartPolicy.addTaskToExecutor(executorId, _)) case ChangeTasksOnExecutor(executorId, tasks) => LOG.info("change Task on executor: " + executorId + ", tasks: " + tasks) - val changeTasks = ChangeTasks(tasks, state.dag.version, processorDescription.life, subscribers) + val changeTasks = ChangeTasks(tasks, state.dag.version, processorDescription.life, + subscribers) executorManager ! UniCast(executorId, changeTasks) case other => LOG.error(s"severe error! we expect ExecutorStarted but get ${other.getClass.toString}") @@ -258,7 +265,7 @@ private[appmaster] class TaskManager( case TasksLaunched => // We will track all launched task by message RegisterTask case TasksChanged(tasks) => - tasks.foreach(task =>state.taskChangeRegistry.taskChanged(task)) + tasks.foreach(task => state.taskChangeRegistry.taskChanged(task)) if (allTasksReady(state)) { broadcastLocations(state) @@ -306,10 +313,11 @@ private[appmaster] class TaskManager( def onExecutorError: Receive = { case ExecutorStopped(executorId) => - if(executorRestartPolicy.allowRestartExecutor(executorId)) { + if (executorRestartPolicy.allowRestartExecutor(executorId)) { jarScheduler.executorFailed(executorId).foreach { resourceRequestDetail => if (resourceRequestDetail.isDefined) { - executorManager ! StartExecutors(resourceRequestDetail.get.requests, resourceRequestDetail.get.jar) + executorManager ! StartExecutors(resourceRequestDetail.get.requests, + resourceRequestDetail.get.jar) } } } else { @@ -330,7 +338,7 @@ private[appmaster] class TaskManager( } /** - * state recovery + * State recovery */ def recovery(state: StartDagState): Receive = { val recoverDagVersion = state.dag.version @@ -344,7 +352,7 @@ private[appmaster] class TaskManager( LOG.info(s"goto state Recovery(recoverDag = $recoverDagVersion)...") val ignoreClock: Receive = { case clock: ClockEvent => - //ignore clock events. + // Ignores clock events. } if (state.dag.isEmpty) { @@ -354,7 +362,8 @@ private[appmaster] class TaskManager( deadTasks = state.taskRegistry.deadTasks) val recoverState = new StartDagState(state.dag, registry) - ignoreClock orElse startDag(state, recoverState) orElse onExecutorError orElse unHandled("recovery") + ignoreClock orElse startDag(state, recoverState) orElse onExecutorError orElse + unHandled("recovery") } } @@ -364,19 +373,17 @@ private[appmaster] class TaskManager( } } -private [appmaster] object TaskManager { +private[appmaster] object TaskManager { /** * When application is ready, then transit to DagReadyState */ - class DagReadyState( - val dag: DAG, - val taskRegistry: TaskRegistry) + class DagReadyState(val dag: DAG, val taskRegistry: TaskRegistry) object DagReadyState { def empty: DagReadyState = { new DagReadyState( - DAG.empty().copy(version = -1), + DAG.empty.copy(version = -1), new TaskRegistry(List.empty[TaskId])) } } @@ -385,10 +392,10 @@ private [appmaster] object TaskManager { * When application is booting up or doing recovery, it use StartDagState */ class StartDagState( - val dag: DAG, - val taskRegistry: TaskRegistry, - val taskChangeRegistry: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId]), - val executorReadyRegistry: ExecutorRegistry = new ExecutorRegistry) + val dag: DAG, + val taskRegistry: TaskRegistry, + val taskChangeRegistry: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId]), + val executorReadyRegistry: ExecutorRegistry = new ExecutorRegistry) case object GetTaskList @@ -397,18 +404,17 @@ private [appmaster] object TaskManager { case class FailedToRecover(errorMsg: String) /** - * Start new Tasks on Executor <executorId> + * Starts new Tasks on Executor executorId */ case class StartTasksOnExecutor(executorId: Int, tasks: List[TaskId]) /** - * Change existing tasks on executor <executorId> + * Changes existing tasks on executor executorId */ case class ChangeTasksOnExecutor(executorId: Int, tasks: List[TaskId]) - /** - * Track the registration of all new started executors. + * Tracks the registration of all new started executors. */ class ExecutorRegistry { private var registeredExecutors = Set.empty[ExecutorId] @@ -423,7 +429,7 @@ private [appmaster] object TaskManager { } /** - * Track the registration of all changed tasks. + * Tracks the registration of all changed tasks. */ class TaskChangeRegistry(targetTasks: List[TaskId]) { private var registeredTasks = Set.empty[TaskId] @@ -443,12 +449,12 @@ private [appmaster] object TaskManager { * DAGDiff is used to track impacted processors when doing dynamic dag. */ case class DAGDiff( - addedProcessors: List[ProcessorId], - modifiedProcessors: List[ProcessorId], - impactedUpstream: List[ProcessorId]) + addedProcessors: List[ProcessorId], + modifiedProcessors: List[ProcessorId], + impactedUpstream: List[ProcessorId]) /** - * Migrate from old DAG to new DAG, return DAGDiff + * Migrates from old DAG to new DAG, return DAGDiff */ def migrate(leftDAG: DAG, rightDAG: DAG): DAGDiff = { val left = leftDAG.processors.keySet @@ -457,19 +463,19 @@ private [appmaster] object TaskManager { val added = right -- left val join = right -- added - val modified = join.filter {processorId => + val modified = join.filter { processorId => leftDAG.processors(processorId) != rightDAG.processors(processorId) } val upstream = (list: Set[ProcessorId]) => { - list.flatMap {processorId => + list.flatMap { processorId => rightDAG.graph.incomingEdgesOf(processorId).map(_._1).toSet } -- list } val impactedUpstream = upstream(added ++ modified) - // all upstream will be affected. + // All upstream tasks are affected, and should be handled properly. DAGDiff(added.toList, modified.toList, impactedUpstream.toList) } @@ -480,7 +486,7 @@ private [appmaster] object TaskManager { private var nextSessionId = 1 /** - * return a new session Id for new task + * Returns a new session Id for new task */ final def newSessionId: Int = { val sessionId = nextSessionId http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala index 79371ea..adfdeba 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala @@ -18,18 +18,18 @@ package io.gearpump.streaming.appmaster -import io.gearpump.streaming.{ExecutorId, ProcessorId} -import io.gearpump.streaming.task.TaskId +import org.slf4j.Logger + import io.gearpump.cluster.scheduler.Resource -import ExecutorManager.ExecutorResourceUsageSummary -import TaskRegistry._ +import io.gearpump.streaming.appmaster.ExecutorManager.ExecutorResourceUsageSummary +import io.gearpump.streaming.appmaster.TaskRegistry._ +import io.gearpump.streaming.task.TaskId +import io.gearpump.streaming.{ExecutorId, ProcessorId} import io.gearpump.transport.HostPort import io.gearpump.util.LogUtil -import org.slf4j.Logger /** - * TaskRegistry is used to track the registration of all tasks - * when one application is booting up. + * Tracks the registration of all tasks, when the application is booting up. */ class TaskRegistry(val expectedTasks: List[TaskId], var registeredTasks: Map[TaskId, TaskLocation] = Map.empty[TaskId, TaskLocation], @@ -43,6 +43,10 @@ class TaskRegistry(val expectedTasks: List[TaskId], * When a task is booted, it need to call registerTask to register itself. * If this task is valid, then accept it, otherwise reject it. * + * @param taskId Task that register itself to TaskRegistry. + * @param location The host and port where this task is running on. NOTE: The host and port + * is NOT the same host and port of Akka remoting. Instead, it is host and port + * of custom netty layer, see [[io.gearpump.transport.netty.Context]]. */ def registerTask(taskId: TaskId, location: TaskLocation): RegisterTaskStatus = { val processorId = taskId.processorId @@ -57,13 +61,13 @@ class TaskRegistry(val expectedTasks: List[TaskId], } def copy(expectedTasks: List[TaskId] = this.expectedTasks, - registeredTasks: Map[TaskId, TaskLocation] = this.registeredTasks, - deadTasks: Set[TaskId] = this.deadTasks): TaskRegistry = { + registeredTasks: Map[TaskId, TaskLocation] = this.registeredTasks, + deadTasks: Set[TaskId] = this.deadTasks): TaskRegistry = { new TaskRegistry(expectedTasks, registeredTasks, deadTasks) } def getTaskLocations: TaskLocations = { - val taskLocations = registeredTasks.toList.groupBy(_._2.host).map{ pair => + val taskLocations = registeredTasks.toList.groupBy(_._2.host).map { pair => val (k, v) = pair val taskIds = v.map(_._1) (k, taskIds.toSet) @@ -74,16 +78,18 @@ class TaskRegistry(val expectedTasks: List[TaskId], def getTaskExecutorMap: Map[TaskId, ExecutorId] = { getTaskLocations.locations.flatMap { pair => val (hostPort, taskSet) = pair - taskSet.map{ taskId => + taskSet.map { taskId => (taskId, getExecutorId(taskId).getOrElse(-1)) } } } + /** Query the executor Id where the task is running on */ def getExecutorId(taskId: TaskId): Option[Int] = { registeredTasks.get(taskId).map(_.executorId) } + /** Gets list of allocated executor Ids */ def executors: List[ExecutorId] = { registeredTasks.toList.map(_._2.executorId) } @@ -101,6 +107,7 @@ class TaskRegistry(val expectedTasks: List[TaskId], registeredTasks.keys.toList.filter(_.processorId == processorId) } + /** List of executors that current processor taks are running on */ def processorExecutors(processorId: ProcessorId): Map[ExecutorId, List[TaskId]] = { val taskToExecutor = filterTasks(processorId).flatMap { taskId => getExecutorId(taskId).map { executorId => @@ -108,15 +115,16 @@ class TaskRegistry(val expectedTasks: List[TaskId], } } - val executorToTasks = taskToExecutor.groupBy(_._2).map{kv => + val executorToTasks = taskToExecutor.groupBy(_._2).map { kv => val (k, v) = kv (k, v.map(_._1)) } executorToTasks } + /** Summary about how many resources are used for all running tasks */ def usedResource: ExecutorResourceUsageSummary = { - val resourceMap = registeredTasks.foldLeft(Map.empty[ExecutorId, Resource]) {(map, task) => + val resourceMap = registeredTasks.foldLeft(Map.empty[ExecutorId, Resource]) { (map, task) => val resource = map.getOrElse(task._2.executorId, Resource(0)) + Resource(1) map + (task._2.executorId -> resource) } @@ -131,5 +139,5 @@ object TaskRegistry { case class TaskLocation(executorId: Int, host: HostPort) - case class TaskLocations(locations : Map[HostPort, Set[TaskId]]) + case class TaskLocations(locations: Map[HostPort, Set[TaskId]]) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala index 39f427a..62dff6c 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,19 @@ package io.gearpump.streaming.appmaster import com.typesafe.config.Config -import io.gearpump.{WorkerId, TimeStamp} + import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} +import io.gearpump.cluster.worker.WorkerId import io.gearpump.streaming.DAG import io.gearpump.streaming.appmaster.TaskLocator.{Locality, WorkerLocality} import io.gearpump.streaming.appmaster.TaskScheduler.{Location, TaskStatus} import io.gearpump.streaming.task.TaskId -import io.gearpump.util.{Constants, LogUtil} -import org.slf4j.Logger +import io.gearpump.util.Constants /** - * This schedules tasks to run for new allocated resources. + * Schedules tasks to run for new allocated resources. TaskScheduler only schedule tasks that + * share the same jar. For scheduling for multiple jars, see + * [[io.gearpump.streaming.appmaster.JarScheduler]]. */ trait TaskScheduler { @@ -44,8 +46,8 @@ trait TaskScheduler { def getResourceRequests(): Array[ResourceRequest] /** - * This notify the scheduler that a resource slot on {workerId} and {executorId} is allocated, and - * expect a task to be scheduled in return. + * This notifies the scheduler that a resource slot on {workerId} and {executorId} is allocated + * , and expect a task to be scheduled in return. * Task locality should be considered when deciding whether to offer a task on target {worker} * and {executor} * @@ -53,24 +55,24 @@ trait TaskScheduler { * @param executorId which executorId this resource belongs to. * @return a list of tasks */ - def schedule(workerId : WorkerId, executorId: Int, resource: Resource) : List[TaskId] + def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId] /** - * This notify the scheduler that {executorId} is failed, and expect a set of + * This notifies the scheduler that {executorId} is failed, and expect a set of * ResourceRequest for all failed tasks on that executor. * * @param executorId executor that failed * @return resource requests of the failed executor */ - def executorFailed(executorId: Int) : Array[ResourceRequest] + def executorFailed(executorId: Int): Array[ResourceRequest] /** - * Query the task list that already scheduled on the executor - * - * @param executorId executor to query - * @return a list of tasks - */ - def scheduledTasks(executorId: Int) : List[TaskId] + * Queries the task list that already scheduled on the executor + * + * @param executorId executor to query + * @return a list of tasks + */ + def scheduledTasks(executorId: Int): List[TaskId] } object TaskScheduler { @@ -79,12 +81,12 @@ object TaskScheduler { class TaskStatus(val taskId: TaskId, val preferLocality: Locality, var allocation: Location) } -class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends TaskScheduler { +class TaskSchedulerImpl(appId: Int, appName: String, config: Config) extends TaskScheduler { private val executorNum = config.getInt(Constants.APPLICATION_EXECUTOR_NUMBER) private var tasks = List.empty[TaskStatus] - // find the locality of the tasks + // Finds the locality of the tasks private val taskLocator = new TaskLocator(appName, config) override def setDAG(dag: DAG): Unit = { @@ -96,15 +98,15 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T } } - def getResourceRequests(): Array[ResourceRequest] ={ + def getResourceRequests(): Array[ResourceRequest] = { fetchResourceRequests(fromOneWorker = false) } - import Relaxation._ - private def fetchResourceRequests(fromOneWorker: Boolean = false): Array[ResourceRequest] ={ + import io.gearpump.cluster.scheduler.Relaxation._ + private def fetchResourceRequests(fromOneWorker: Boolean = false): Array[ResourceRequest] = { var workersResourceRequest = Map.empty[WorkerId, Resource] - tasks.filter(_.allocation == null).foreach{task => + tasks.filter(_.allocation == null).foreach { task => task.preferLocality match { case WorkerLocality(workerId) => val current = workersResourceRequest.getOrElse(workerId, Resource.empty) @@ -116,7 +118,7 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T } } - workersResourceRequest.map {workerIdAndResource => + workersResourceRequest.map { workerIdAndResource => val (workerId, resource) = workerIdAndResource if (workerId == WorkerId.unspecified) { ResourceRequest(resource, workerId = WorkerId.unspecified, executorNum = executorNum) @@ -126,23 +128,26 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T }.toArray } - override def schedule(workerId : WorkerId, executorId: Int, resource: Resource) : List[TaskId] = { + override def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId] = { var scheduledTasks = List.empty[TaskId] val location = Location(workerId, executorId) - // schedule tasks for specific worker + // Schedules tasks for specific worker scheduledTasks ++= scheduleTasksForLocality(resource, location, (locality) => locality == WorkerLocality(workerId)) - // schedule tasks without specific location preference - scheduledTasks ++= scheduleTasksForLocality(resource - Resource(scheduledTasks.length), location, (locality) => true) + // Schedules tasks without specific location preference + scheduledTasks ++= scheduleTasksForLocality(resource - Resource(scheduledTasks.length), + location, (locality) => true) scheduledTasks } - private def scheduleTasksForLocality(resource: Resource, resourceLocation: Location, matcher: (Locality) => Boolean): List[TaskId] = { + private def scheduleTasksForLocality( + resource: Resource, resourceLocation: Location, matcher: (Locality) => Boolean) + : List[TaskId] = { var scheduledTasks = List.empty[TaskId] var index = 0 var remain = resource.slots - while(index < tasks.length && remain > 0) { + while (index < tasks.length && remain > 0) { val taskStatus = tasks(index) if (taskStatus.allocation == null && matcher(taskStatus.preferLocality)) { taskStatus.allocation = resourceLocation @@ -154,18 +159,19 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T scheduledTasks } - override def executorFailed(executorId: Int) : Array[ResourceRequest] = { + override def executorFailed(executorId: Int): Array[ResourceRequest] = { val failedTasks = tasks.filter { status => status.allocation != null && status.allocation.executorId == executorId } - // clean the location of failed tasks + // Cleans the location of failed tasks failedTasks.foreach(_.allocation = null) - Array(ResourceRequest(Resource(failedTasks.length), workerId = WorkerId.unspecified, relaxation = ONEWORKER)) + Array(ResourceRequest(Resource(failedTasks.length), + workerId = WorkerId.unspecified, relaxation = ONEWORKER)) } override def scheduledTasks(executorId: Int): List[TaskId] = { - tasks.filter{ status => + tasks.filter { status => status.allocation != null && status.allocation.executorId == executorId }.map(_.taskId) }
