http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala deleted file mode 100644 index bd18bdc..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala +++ /dev/null @@ -1,447 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import java.util -import java.util.Date -import java.util.concurrent.TimeUnit -import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration -import scala.language.implicitConversions - -import akka.actor.{Actor, Cancellable, Stash} -import org.slf4j.Logger - -import io.gearpump.TimeStamp -import io.gearpump.cluster.ClientToMaster.GetStallingTasks -import io.gearpump.google.common.primitives.Longs -import io.gearpump.streaming.AppMasterToMaster.StallingTasks -import io.gearpump.streaming._ -import io.gearpump.streaming.appmaster.ClockService.HealthChecker.ClockValue -import io.gearpump.streaming.appmaster.ClockService._ -import io.gearpump.streaming.storage.AppDataStore -import io.gearpump.streaming.task._ -import io.gearpump.util.LogUtil - -/** - * Maintains a global view of message timestamp in the application - */ -class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with Stash { - private val LOG: Logger = LogUtil.getLogger(getClass) - - import context.dispatcher - - private val healthChecker = new HealthChecker(stallingThresholdSeconds = 60) - private var healthCheckScheduler: Cancellable = null - private var snapshotScheduler: Cancellable = null - - override def receive: Receive = null - - override def preStart(): Unit = { - LOG.info("Initializing Clock service, get snapshotted StartClock ....") - store.get(START_CLOCK).asInstanceOf[Future[TimeStamp]].map { clock => - val startClock = Option(clock).getOrElse(0L) - - minCheckpointClock = Some(startClock) - - // Recover the application by restarting from last persisted startClock. - // Only messge after startClock will be replayed. - self ! StoredStartClock(startClock) - LOG.info(s"Start Clock Retrieved, starting ClockService, startClock: $startClock") - } - - context.become(waitForStartClock) - } - - override def postStop(): Unit = { - Option(healthCheckScheduler).map(_.cancel) - Option(snapshotScheduler).map(_.cancel) - } - - // Keep track of clock value of all processors. - private var clocks = Map.empty[ProcessorId, ProcessorClock] - - // Each process can have multiple upstream processors. This keep track of the upstream clocks. - private var upstreamClocks = Map.empty[ProcessorId, Array[ProcessorClock]] - - // We use Array instead of List for Performance consideration - private var processorClocks = Array.empty[ProcessorClock] - - private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = null - - private var minCheckpointClock: Option[TimeStamp] = None - - private def checkpointEnabled(processor: ProcessorDescription): Boolean = { - val taskConf = processor.taskConf - taskConf != null && taskConf.getBoolean("state.checkpoint.enable") == Some(true) - } - - private def resetCheckpointClocks(dag: DAG, startClock: TimeStamp): Unit = { - this.checkpointClocks = dag.processors.filter(startClock < _._2.life.death) - .filter { case (_, processor) => - checkpointEnabled(processor) - }.flatMap { case (id, processor) => - (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[TimeStamp]) - } - if (this.checkpointClocks.isEmpty) { - minCheckpointClock = None - } - } - - private def initDag(startClock: TimeStamp): Unit = { - recoverDag(this.dag, startClock) - } - - private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = { - this.clocks = dag.processors.filter(startClock < _._2.life.death). - map { pair => - val (processorId, processor) = pair - val parallelism = processor.parallelism - val clock = new ProcessorClock(processorId, processor.life, parallelism) - clock.init(startClock) - (processorId, clock) - } - - this.upstreamClocks = clocks.map { pair => - val (processorId, processor) = pair - - val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1) - val upstreamClocks = upstreams.flatMap(clocks.get(_)) - (processorId, upstreamClocks.toArray) - } - - this.processorClocks = clocks.toArray.map(_._2) - - resetCheckpointClocks(dag, startClock) - } - - private def dynamicDAG(dag: DAG, startClock: TimeStamp): Unit = { - val newClocks = dag.processors.filter(startClock < _._2.life.death). - map { pair => - val (processorId, processor) = pair - val parallelism = processor.parallelism - - val clock = if (clocks.contains(processor.id)) { - clocks(processorId).copy(life = processor.life) - } else { - new ProcessorClock(processorId, processor.life, parallelism) - } - (processorId, clock) - } - - this.clocks = newClocks - - this.upstreamClocks = newClocks.map { pair => - val (processorId, processor) = pair - - val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1) - val upstreamClocks = upstreams.flatMap(newClocks.get(_)) - (processorId, upstreamClocks.toArray) - } - - // Inits the clock of all processors. - newClocks.map { pair => - val (processorId, processorClock) = pair - val upstreamClock = getUpStreamMinClock(processorId) - val birth = processorClock.life.birth - - if (dag.graph.inDegreeOf(processorId) == 0) { - processorClock.init(Longs.max(birth, startClock)) - } else { - processorClock.init(upstreamClock) - } - } - - this.processorClocks = clocks.toArray.map(_._2) - - resetCheckpointClocks(dag, startClock) - } - - def waitForStartClock: Receive = { - case StoredStartClock(startClock) => - initDag(startClock) - - import context.dispatcher - - // Period report current clock - healthCheckScheduler = context.system.scheduler.schedule( - new FiniteDuration(5, TimeUnit.SECONDS), - new FiniteDuration(60, TimeUnit.SECONDS), self, HealthCheck) - - // Period snpashot latest min startclock to external storage - snapshotScheduler = context.system.scheduler.schedule(new FiniteDuration(5, TimeUnit.SECONDS), - new FiniteDuration(5, TimeUnit.SECONDS), self, SnapshotStartClock) - - unstashAll() - context.become(clockService) - - case _ => - stash() - } - - private def getUpStreamMinClock(processorId: ProcessorId): TimeStamp = { - val clocks = upstreamClocks.get(processorId) - if (clocks.isDefined) { - if (clocks.get == null || clocks.get.length == 0) { - Long.MaxValue - } else { - ProcessorClocks.minClock(clocks.get) - } - } else { - Long.MaxValue - } - } - - def clockService: Receive = { - case GetUpstreamMinClock(task) => - sender ! UpstreamMinClock(getUpStreamMinClock(task.processorId)) - - case update@UpdateClock(task, clock) => - val upstreamMinClock = getUpStreamMinClock(task.processorId) - - val processorClock = clocks.get(task.processorId) - if (processorClock.isDefined) { - processorClock.get.updateMinClock(task.index, clock) - } else { - LOG.error(s"Cannot updateClock for task $task") - } - sender ! UpstreamMinClock(upstreamMinClock) - - case GetLatestMinClock => - sender ! LatestMinClock(minClock) - - case GetStartClock => - sender ! StartClock(getStartClock) - - case deathCheck: CheckProcessorDeath => - val processorId = deathCheck.processorId - val processorClock = clocks.get(processorId) - if (processorClock.isDefined) { - val life = processorClock.get.life - if (processorClock.get.min >= life.death) { - - LOG.info(s"Removing $processorId from clock service...") - removeProcessor(processorId) - } else { - LOG.info(s"Unsuccessfully in removing $processorId from clock service...," + - s" min: ${processorClock.get.min}, life: $life") - } - } - case HealthCheck => - selfCheck() - - case SnapshotStartClock => - snapshotStartClock() - - case ReportCheckpointClock(task, time) => - updateCheckpointClocks(task, time) - - case GetCheckpointClock => - sender ! CheckpointClock(minCheckpointClock) - - case getStalling: GetStallingTasks => - sender ! StallingTasks(healthChecker.getReport.stallingTasks) - - case ChangeToNewDAG(dag) => - if (dag.version > this.dag.version) { - // Transits to a new dag version - this.dag = dag - dynamicDAG(dag, getStartClock) - } else { - // Restarts current dag. - recoverDag(dag, getStartClock) - } - LOG.info(s"Change to new DAG(dag = ${dag.version}), send back ChangeToNewDAGSuccess") - sender ! ChangeToNewDAGSuccess(clocks.map { pair => - val (id, clock) = pair - (id, clock.min) - }) - } - - private def removeProcessor(processorId: ProcessorId): Unit = { - clocks = clocks - processorId - processorClocks = processorClocks.filter(_.processorId != processorId) - - upstreamClocks = upstreamClocks.map { pair => - val (id, upstreams) = pair - val updatedUpstream = upstreams.filter(_.processorId != processorId) - (id, updatedUpstream) - } - - upstreamClocks = upstreamClocks - processorId - - // Removes dead processor from checkpoints. - checkpointClocks = checkpointClocks.filter { kv => - val (taskId, processor) = kv - taskId.processorId != processorId - } - } - - private def minClock: TimeStamp = { - ProcessorClocks.minClock(processorClocks) - } - - def selfCheck(): Unit = { - val minTimestamp = minClock - - if (Long.MaxValue == minTimestamp) { - processorClocks.foreach { clock => - LOG.info(s"Processor ${clock.processorId} Clock: min: ${clock.min}, " + - s"taskClocks: " + clock.taskClocks.mkString(",")) - } - } - - healthChecker.check(minTimestamp, clocks, dag, System.currentTimeMillis()) - } - - private def getStartClock: TimeStamp = { - minCheckpointClock.getOrElse(minClock) - } - - private def snapshotStartClock(): Unit = { - store.put(START_CLOCK, getStartClock) - } - - private def updateCheckpointClocks(task: TaskId, time: TimeStamp): Unit = { - val clocks = checkpointClocks(task) :+ time - checkpointClocks += task -> clocks - - if (checkpointClocks.forall(_._2.contains(time))) { - minCheckpointClock = Some(time) - LOG.info(s"minCheckpointTime $minCheckpointClock") - - checkpointClocks = checkpointClocks.mapValues(_.dropWhile(_ <= time)) - } - } -} - -object ClockService { - val START_CLOCK = "startClock" - - case object HealthCheck - - class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallism: Int, - private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) { - - def copy(life: LifeTime): ProcessorClock = { - new ProcessorClock(processorId, life, parallism, _min, _taskClocks) - } - - def min: TimeStamp = _min - def taskClocks: Array[TimeStamp] = _taskClocks - - def init(startClock: TimeStamp): Unit = { - if (taskClocks == null) { - this._min = startClock - this._taskClocks = new Array(parallism) - util.Arrays.fill(taskClocks, startClock) - } - } - - def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = { - taskClocks(taskIndex) = clock - _min = Longs.min(taskClocks: _*) - } - } - - case object SnapshotStartClock - - case class Report(stallingTasks: List[TaskId]) - - /** - * Check whether the clock is advancing normally - */ - class HealthChecker(stallingThresholdSeconds: Int) { - private val LOG: Logger = LogUtil.getLogger(getClass) - - private var minClock: ClockValue = null - private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000 - // 60 seconds - private var stallingTasks = Array.empty[TaskId] - - /** Check for stalling tasks */ - def check( - currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock], - dag: DAG, now: TimeStamp): Unit = { - var isClockStalling = false - if (null == minClock || currentMinClock > minClock.appClock) { - minClock = ClockValue(systemClock = now, appClock = currentMinClock) - } else { - // Clock not advancing - if (now > minClock.systemClock + stallingThresholdMilliseconds) { - LOG.warn(s"Clock has not advanced for ${(now - minClock.systemClock) / 1000} seconds " + - s"since ${minClock.prettyPrint}...") - isClockStalling = true - } - } - - if (isClockStalling) { - val processorId = dag.graph.topologicalOrderWithCirclesIterator.toList.find { processorId => - val clock = processorClocks.get(processorId) - if (clock.isDefined) { - clock.get.min == minClock.appClock - } else { - false - } - } - - processorId.foreach { processorId => - val processorClock = processorClocks(processorId) - val taskClocks = processorClock.taskClocks - stallingTasks = taskClocks.zipWithIndex.filter(_._1 == minClock.appClock). - map(pair => TaskId(processorId, pair._2)) - } - LOG.info(s"Stalling Tasks: ${stallingTasks.mkString(",")}") - } else { - stallingTasks = Array.empty[TaskId] - } - } - - def getReport: Report = { - Report(stallingTasks.toList) - } - } - - object HealthChecker { - case class ClockValue(systemClock: TimeStamp, appClock: TimeStamp) { - def prettyPrint: String = { - "(system clock: " + new Date(systemClock).toString + ", app clock: " + appClock + ")" - } - } - } - - object ProcessorClocks { - - // Get the Min clock of all processors - def minClock(clock: Array[ProcessorClock]): TimeStamp = { - var i = 0 - var min = if (clock.length == 0) 0L else clock(0).min - while (i < clock.length) { - min = Math.min(min, clock(i).min) - i += 1 - } - min - } - } - - case class ChangeToNewDAG(dag: DAG) - case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, TimeStamp]) - - case class StoredStartClock(clock: TimeStamp) -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala deleted file mode 100644 index f18b387..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import scala.concurrent.Future - -import akka.actor.{Actor, ActorRef, Stash} -import org.slf4j.Logger - -import io.gearpump.cluster.UserConfig -import io.gearpump.partitioner.PartitionerDescription -import io.gearpump.streaming._ -import io.gearpump.streaming.appmaster.DagManager._ -import io.gearpump.streaming.storage.AppDataStore -import io.gearpump.streaming.task.Subscriber -import io.gearpump.util.{Graph, LogUtil} - -/** - * Handles dag modification and other stuff related with DAG - * - * DagManager maintains multiple version of DAGs. For each version, the DAG is immutable. - * For operations like modifying a processor, it will create a new version of DAG. - */ -class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: Option[DAG]) - extends Actor with Stash { - - import context.dispatcher - private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) - private val NOT_INITIALIZED = -1 - - private var dags = List.empty[DAG] - private var maxProcessorId = -1 - private implicit val system = context.system - - private var watchers = List.empty[ActorRef] - - override def receive: Receive = null - - override def preStart(): Unit = { - LOG.info("Initializing Dag Service, get stored Dag ....") - store.get(StreamApplication.DAG).asInstanceOf[Future[DAG]].map { storedDag => - if (storedDag != null) { - dags :+= storedDag - } else { - dags :+= dag.getOrElse(DAG(userConfig.getValue[Graph[ProcessorDescription, - PartitionerDescription]](StreamApplication.DAG).get)) - } - maxProcessorId = { - val keys = dags.head.processors.keys - if (keys.size == 0) { - 0 - } else { - keys.max - } - } - self ! DagInitiated - } - context.become(waitForDagInitiate) - } - - def waitForDagInitiate: Receive = { - case DagInitiated => - unstashAll() - context.become(dagService) - case _ => - stash() - } - - private def nextProcessorId: ProcessorId = { - maxProcessorId += 1 - maxProcessorId - } - - private def taskLaunchData(dag: DAG, processorId: Int, context: AnyRef): TaskLaunchData = { - val processorDescription = dag.processors(processorId) - val subscribers = Subscriber.of(processorId, dag) - TaskLaunchData(processorDescription, subscribers, context) - } - - def dagService: Receive = { - case GetLatestDAG => - // Get the latest version of DAG. - sender ! LatestDAG(dags.last) - case GetTaskLaunchData(version, processorId, context) => - // Task information like Processor class, downstream subscriber processors and etc. - dags.find(_.version == version).foreach { dag => - LOG.info(s"Get task launcher data for processor: $processorId, dagVersion: $version") - sender ! taskLaunchData(dag, processorId, context) - } - case ReplaceProcessor(oldProcessorId, inputNewProcessor) => - // Replace a processor with new implementation. The upstream processors and downstream - // processors are NOT changed. - var newProcessor = inputNewProcessor.copy(id = nextProcessorId) - if (inputNewProcessor.jar == null) { - val oldJar = dags.last.processors.get(oldProcessorId).get - newProcessor = newProcessor.copy(jar = oldJar.jar) - } - if (dags.length > 1) { - sender ! DAGOperationFailed( - "We are in the process of handling previous dynamic dag change") - } else { - val oldDAG = dags.last - val newVersion = oldDAG.version + 1 - val newDAG = replaceDAG(oldDAG, oldProcessorId, newProcessor, newVersion) - dags :+= newDAG - - LOG.info(s"ReplaceProcessor old: $oldProcessorId, new: $newProcessor") - LOG.info(s"new DAG: $newDAG") - watchers.foreach(_ ! LatestDAG(newDAG)) - sender ! DAGOperationSuccess - } - - case WatchChange(watcher) => - // Checks whether there are modifications for this DAG. - if (!this.watchers.contains(watcher)) { - this.watchers :+= watcher - } - - case NewDAGDeployed(dagVersion) => - // Means dynamic Dag transition completed, and the new DAG version has been successfully - // deployed. The obsolete dag versions will be removed. - if (dagVersion != NOT_INITIALIZED) { - dags = dags.filter(_.version == dagVersion) - store.put(StreamApplication.DAG, dags.last) - } - } - - private def replaceDAG( - dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int) - : DAG = { - val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth, - newProcessor.life.birth) - - val newProcessorMap = dag.processors ++ - Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife), - newProcessor.id -> newProcessor) - - val newGraph = dag.graph.subGraph(oldProcessorId). - replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph) - new DAG(newVersion, newProcessorMap, newGraph) - } -} - -object DagManager { - case object DagInitiated - - case class WatchChange(watcher: ActorRef) - - case object GetLatestDAG - case class LatestDAG(dag: DAG) - - case class GetTaskLaunchData(dagVersion: Int, processorId: Int, context: AnyRef = null) - case class TaskLaunchData(processorDescription : ProcessorDescription, - subscribers: List[Subscriber], context: AnyRef = null) - - sealed trait DAGOperation - - case class ReplaceProcessor(oldProcessorId: ProcessorId, - newProcessorDescription: ProcessorDescription) extends DAGOperation - - sealed trait DAGOperationResult - case object DAGOperationSuccess extends DAGOperationResult - case class DAGOperationFailed(reason: String) extends DAGOperationResult - - case class NewDAGDeployed(dagVersion: Int) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala deleted file mode 100644 index ab99af5..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import scala.concurrent.duration._ -import scala.util.{Failure, Try} - -import akka.actor.SupervisorStrategy.Stop -import akka.actor._ -import akka.remote.RemoteScope -import com.typesafe.config.Config -import org.apache.commons.lang.exception.ExceptionUtils - -import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource -import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems} -import io.gearpump.cluster.appmaster.WorkerInfo -import io.gearpump.cluster.scheduler.{Resource, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.cluster.{AppJar, AppMasterContext, ExecutorContext, UserConfig} -import io.gearpump.streaming.ExecutorId -import io.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor -import io.gearpump.streaming.appmaster.ExecutorManager._ -import io.gearpump.streaming.executor.Executor -import io.gearpump.util.{LogUtil, Util} - -/** - * ExecutorManager manage the start and stop of all executors. - * - * ExecutorManager will launch Executor when asked. It hide the details like starting - * a new ExecutorSystem from user. Please use ExecutorManager.props() to construct this actor - */ -private[appmaster] class ExecutorManager( - userConfig: UserConfig, - appContext: AppMasterContext, - executorFactory: (ExecutorContext, UserConfig, Address, ExecutorId) => Props, - clusterConfig: Config, - appName: String) - extends Actor { - - private val LOG = LogUtil.getLogger(getClass) - - import appContext.{appId, masterProxy, username} - - private var taskManager: ActorRef = null - - private implicit val actorSystem = context.system - private val systemConfig = context.system.settings.config - - private var executors = Map.empty[Int, ExecutorInfo] - - def receive: Receive = waitForTaskManager - - def waitForTaskManager: Receive = { - case SetTaskManager(taskManager) => - this.taskManager = taskManager - context.become(service orElse terminationWatch) - } - - // If something wrong on executor, ExecutorManager will stop the current executor, - // and wait for AppMaster to start a new executor. - override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, - withinTimeRange = 1.minute) { - case ex: Throwable => - val executorId = Try(sender.path.name.toInt) - executorId match { - case scala.util.Success(id) => { - executors -= id - LOG.error(s"Executor $id throws exception, stop it...\n" + - ExceptionUtils.getStackTrace(ex)) - } - case Failure(ex) => { - LOG.error(s"Sender ${sender.path} is dead, but seems it is not an executor...") - } - } - Stop - } - - // Responds to outside queries - def service: Receive = { - case StartExecutors(resources, jar) => - masterProxy ! StartExecutorSystems(resources, getExecutorJvmConfig(Some(jar))) - case ExecutorSystemStarted(executorSystem, boundedJar) => - import executorSystem.{address, executorSystemId, resource => executorResource, worker} - - val executorId = executorSystemId - val executorContext = ExecutorContext(executorId, worker, appId, appName, - appMaster = context.parent, executorResource) - executors += executorId -> ExecutorInfo(executorId, null, worker, boundedJar) - - // Starts executor - val executor = context.actorOf(executorFactory(executorContext, userConfig, - address, executorId), executorId.toString) - executorSystem.bindLifeCycleWith(executor) - case StartExecutorSystemTimeout => - taskManager ! StartExecutorsTimeOut - - case RegisterExecutor(executor, executorId, resource, worker) => - LOG.info(s"executor $executorId has been launched") - // Watches for executor termination - context.watch(executor) - val executorInfo = executors.get(executorId).get - executors += executorId -> executorInfo.copy(executor = executor) - taskManager ! ExecutorStarted(executorId, resource, worker.workerId, executorInfo.boundedJar) - - // Broadcasts message to all executors - case BroadCast(msg) => - LOG.info(s"Broadcast ${msg.getClass.getSimpleName} to all executors") - context.children.foreach(_ forward msg) - - // Unicasts message to single executor - case UniCast(executorId, msg) => - LOG.info(s"Unicast ${msg.getClass.getSimpleName} to executor $executorId") - val executor = executors.get(executorId) - executor.foreach(_.executor forward msg) - - case GetExecutorInfo => - sender ! executors - - // Tells Executor manager resources that are occupied. The Executor Manager can use this - // information to tell worker to reclaim un-used resources - case ExecutorResourceUsageSummary(resources) => - executors.foreach { pair => - val (executorId, executor) = pair - val resource = resources.get(executorId) - val worker = executor.worker.ref - // Notifies the worker the actual resource used by this application. - resource match { - case Some(resource) => - worker ! ChangeExecutorResource(appId, executorId, resource) - case None => - worker ! ChangeExecutorResource(appId, executorId, Resource(0)) - } - } - } - - def terminationWatch: Receive = { - case Terminated(actor) => - val executorId = Try(actor.path.name.toInt) - executorId match { - case scala.util.Success(id) => { - executors -= id - LOG.error(s"Executor $id is down") - taskManager ! ExecutorStopped(id) - } - case scala.util.Failure(ex) => - LOG.error(s"failed to get the executor Id from path string ${actor.path}", ex) - } - } - - private def getExecutorJvmConfig(jar: Option[AppJar]): ExecutorSystemJvmConfig = { - val executorAkkaConfig = clusterConfig - val jvmSetting = Util.resolveJvmSetting(executorAkkaConfig.withFallback(systemConfig)).executor - - ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, jar, - username, executorAkkaConfig) - } -} - -private[appmaster] object ExecutorManager { - case class StartExecutors(resources: Array[ResourceRequest], jar: AppJar) - case class BroadCast(msg: Any) - - case class UniCast(executorId: Int, msg: Any) - - case object GetExecutorInfo - - case class ExecutorStarted( - executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar]) - case class ExecutorStopped(executorId: Int) - - case class SetTaskManager(taskManager: ActorRef) - - case object StartExecutorsTimeOut - - def props( - userConfig: UserConfig, appContext: AppMasterContext, clusterConfig: Config, appName: String) - : Props = { - val executorFactory = - (executorContext: ExecutorContext, - userConfig: UserConfig, - address: Address, - executorId: ExecutorId) => - Props(classOf[Executor], executorContext, userConfig) - .withDeploy(Deploy(scope = RemoteScope(address))) - - Props(new ExecutorManager(userConfig, appContext, executorFactory, clusterConfig, appName)) - } - - case class ExecutorResourceUsageSummary(resources: Map[ExecutorId, Resource]) - - case class ExecutorInfo( - executorId: ExecutorId, executor: ActorRef, worker: WorkerInfo, boundedJar: Option[AppJar]) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala deleted file mode 100644 index a2a9c74..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.appmaster - -import scala.concurrent.Future - -import akka.actor._ -import akka.pattern.ask -import com.typesafe.config.Config - -import io.gearpump.TimeStamp -import io.gearpump.cluster.AppJar -import io.gearpump.cluster.scheduler.{Resource, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.partitioner.PartitionerDescription -import io.gearpump.streaming.appmaster.JarScheduler._ -import io.gearpump.streaming.task.TaskId -import io.gearpump.streaming.{DAG, ProcessorDescription} -import io.gearpump.util.{Constants, Graph, LogUtil} - -/** - * Different processors of the stream application can use different jars. JarScheduler is the - * scheduler for different jars. - * - * For a DAG of multiple processors, each processor can have its own jar. Tasks of same jar - * is scheduled by TaskScheduler, and TaskSchedulers are scheduled by JarScheduler. - * - * In runtime, the implementation is delegated to actor JarSchedulerImpl - */ -class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRefFactory) { - private val actor: ActorRef = factory.actorOf(Props(new JarSchedulerImpl(appId, appName, config))) - private implicit val dispatcher = factory.dispatcher - private implicit val timeout = Constants.FUTURE_TIMEOUT - - /** Set the current DAG version active */ - def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = { - actor ! TransitToNewDag - startClock.map { start => - actor ! NewDag(dag, start) - } - } - - /** AppMaster ask JarScheduler about how many resource it wants */ - def getResourceRequestDetails(): Future[Array[ResourceRequestDetail]] = { - (actor ? GetResourceRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]] - } - - /** - * AppMaster has resource allocated, and ask the jar scheduler to schedule tasks - * for this executor. - */ - def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource) - : Future[List[TaskId]] = { - (actor ? ScheduleTask(appJar, workerId, executorId, resource)) - .asInstanceOf[Future[List[TaskId]]] - } - - /** - * Some executor JVM process is dead. AppMaster asks jar scheduler to re-schedule the impacted - * tasks. - */ - def executorFailed(executorId: Int): Future[Option[ResourceRequestDetail]] = { - (actor ? ExecutorFailed(executorId)).asInstanceOf[Future[Option[ResourceRequestDetail]]] - } -} - -object JarScheduler { - - case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest]) - - case class NewDag(dag: DAG, startTime: TimeStamp) - - case object TransitToNewDag - - case object GetResourceRequestDetails - - /** - * Schedule tasks for one appJar. - * - * @param appJar Application jar. - * @param workerId Worker machine Id. - * @param executorId Executor Id. - * @param resource Slots that are available. - */ - case class ScheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource) - - /** Some executor JVM is dead, try to recover tasks that are located on failed executor */ - case class ExecutorFailed(executorId: Int) - - class JarSchedulerImpl(appId: Int, appName: String, config: Config) extends Actor with Stash { - - // Each TaskScheduler maps to a jar. - private var taskSchedulers = Map.empty[AppJar, TaskScheduler] - - private val LOG = LogUtil.getLogger(getClass) - - def receive: Receive = waitForNewDag - - def waitForNewDag: Receive = { - case TransitToNewDag => // Continue current state - case NewDag(dag, startTime) => - - LOG.info(s"Init JarScheduler, dag version: ${dag.version}, startTime: $startTime") - - val processors = dag.processors.values.groupBy(_.jar) - - taskSchedulers = processors.map { jarAndProcessors => - val (jar, processors) = jarAndProcessors - // Construct the sub DAG, each sub DAG maps to a separate jar. - val subGraph = Graph.empty[ProcessorDescription, PartitionerDescription] - processors.foreach { processor => - if (startTime < processor.life.death) { - subGraph.addVertex(processor) - } - } - val subDagForSingleJar = DAG(subGraph) - - val taskScheduler = taskSchedulers - .getOrElse(jar, new TaskSchedulerImpl(appId, appName, config)) - - LOG.info(s"Set DAG for TaskScheduler, count: " + subDagForSingleJar.processors.size) - taskScheduler.setDAG(subDagForSingleJar) - jar -> taskScheduler - } - unstashAll() - context.become(ready) - case other => - stash() - } - - def ready: Receive = { - // Notifies there is a new DAG coming. - case TransitToNewDag => - context.become(waitForNewDag) - - case GetResourceRequestDetails => - - // Asks each TaskScheduler (Each for one jar) the resource requests. - val result: Array[ResourceRequestDetail] = taskSchedulers.map { jarAndScheduler => - val (jar, scheduler) = jarAndScheduler - ResourceRequestDetail(jar, scheduler.getResourceRequests()) - }.toArray - LOG.info(s"GetRequestDetails " + result.mkString(";")) - sender ! result - - case ScheduleTask(appJar, workerId, executorId, resource) => - val result: List[TaskId] = taskSchedulers.get(appJar).map { scheduler => - scheduler.schedule(workerId, executorId, resource) - }.getOrElse(List.empty) - LOG.info(s"ScheduleTask " + result.mkString(";")) - sender ! result - case ExecutorFailed(executorId) => - val result: Option[ResourceRequestDetail] = taskSchedulers. - find(_._2.scheduledTasks(executorId).nonEmpty).map { jarAndScheduler => - ResourceRequestDetail(jarAndScheduler._1, jarAndScheduler._2.executorFailed(executorId)) - } - LOG.info(s"ExecutorFailed " + result.mkString(";")) - sender ! result - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala deleted file mode 100644 index 3b2c8bf..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import io.gearpump._ -import io.gearpump.cluster.AppMasterToMaster.AppMasterSummary -import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus -import io.gearpump.cluster.{MasterToAppMaster, UserConfig} -import io.gearpump.streaming.appmaster.AppMaster.ExecutorBrief -import io.gearpump.streaming.{ExecutorId, LifeTime, ProcessorId} -import io.gearpump.util.Graph -import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig - -/** Stream application summary, used for REST API */ -case class StreamAppMasterSummary( - appType: String = "streaming", - appId: Int, - appName: String = null, - actorPath: String = null, - clock: TimeStamp = 0L, - status: AppMasterStatus = MasterToAppMaster.AppMasterActive, - startTime: TimeStamp = 0L, - uptime: TimeStamp = 0L, - user: String = null, - homeDirectory: String = "", - logFile: String = "", - dag: Graph[ProcessorId, String] = null, - executors: List[ExecutorBrief] = null, - processors: Map[ProcessorId, ProcessorSummary] = Map.empty[ProcessorId, ProcessorSummary], - // Hiearachy level for each processor - processorLevels: Map[ProcessorId, Int] = Map.empty[ProcessorId, Int], - historyMetricsConfig: HistoryMetricsConfig = null) - extends AppMasterSummary - -case class TaskCount(count: Int) - -case class ProcessorSummary( - id: ProcessorId, - taskClass: String, - parallelism: Int, - description: String, - taskConf: UserConfig, - life: LifeTime, - executors: List[ExecutorId], - taskCount: Map[ExecutorId, TaskCount]) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala deleted file mode 100644 index 6e7ebc6..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.appmaster - -import scala.collection.JavaConverters._ -import scala.util.Try - -import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions, ConfigValueFactory} - -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.streaming.Constants -import io.gearpump.streaming.appmaster.TaskLocator.{Localities, Locality, NonLocality, WorkerLocality} -import io.gearpump.streaming.task.TaskId - -/** - * TaskLocator is used to decide which machine one task should run on. - * - * User can specify config [[io.gearpump.streaming.Constants#GEARPUMP_STREAMING_LOCALITIES]] to - * decide to control which machine the task is running on. - */ -class TaskLocator(appName: String, config: Config) { - private val taskLocalities: Map[TaskId, Locality] = loadTaskLocalities(config) - - /** Finds where a task should belongs to */ - def locateTask(taskId: TaskId): Locality = { - taskLocalities.getOrElse(taskId, NonLocality) - } - - private def loadTaskLocalities(config: Config): Map[TaskId, Locality] = { - import io.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES - Try(config.getConfig(s"$GEARPUMP_STREAMING_LOCALITIES.$appName")).map { appConfig => - val json = appConfig.root().render(ConfigRenderOptions.concise) - Localities.fromJson(json) - }.map { localityConfig => - import localityConfig.localities - localities.keySet.flatMap { workerId => - val tasks = localities(workerId) - tasks.map((_, WorkerLocality(workerId))) - }.toArray.toMap - }.getOrElse(Map.empty[TaskId, Locality]) - } -} - -object TaskLocator { - - trait Locality - - /** Means we require the resource from the specific worker */ - case class WorkerLocality(workerId: WorkerId) extends Locality - - /** Means no preference on worker */ - object NonLocality extends Locality - - /** Localities settings. Mapping from workerId to list of taskId */ - case class Localities(localities: Map[WorkerId, Array[TaskId]]) - - object Localities { - val pattern = "task_([0-9]+)_([0-9]+)".r - - // To avoid polluting the classpath, we do the JSON translation ourself instead of - // introducing JSON library dependencies directly. - def fromJson(json: String): Localities = { - val localities = ConfigFactory.parseString(json).getAnyRef("localities") - .asInstanceOf[java.util.Map[String, String]].asScala.map { pair => - val workerId: WorkerId = WorkerId.parse(pair._1) - val tasks = pair._2.split(",").map { task => - val pattern(processorId, taskIndex) = task - TaskId(processorId.toInt, taskIndex.toInt) - } - (workerId, tasks) - }.toMap - new Localities(localities) - } - - def toJson(localities: Localities): String = { - val map = localities.localities.toList.map { pair => - (WorkerId.render(pair._1), pair._2.map(task => - s"task_${task.processorId}_${task.index}").mkString(",")) - }.toMap.asJava - ConfigFactory.empty().withValue("localities", ConfigValueFactory.fromAnyRef(map)). - root.render(ConfigRenderOptions.concise()) - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala deleted file mode 100644 index 51f2f9c..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala +++ /dev/null @@ -1,497 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import scala.concurrent.Future -import scala.concurrent.duration._ - -import akka.actor._ -import akka.pattern.ask -import org.slf4j.Logger - -import io.gearpump.TimeStamp -import io.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge -import io.gearpump.streaming.AppMasterToExecutor._ -import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, UnRegisterTask} -import io.gearpump.streaming._ -import io.gearpump.streaming.appmaster.AppMaster.{AllocateResourceTimeOut, LookupTaskActorRef, TaskActorRef} -import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess} -import io.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange} -import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, StartExecutorsTimeOut, _} -import io.gearpump.streaming.appmaster.TaskManager._ -import io.gearpump.streaming.appmaster.TaskRegistry.{Accept, TaskLocation} -import io.gearpump.streaming.executor.Executor.RestartTasks -import io.gearpump.streaming.executor.ExecutorRestartPolicy -import io.gearpump.streaming.task._ -import io.gearpump.streaming.util.ActorPathUtil -import io.gearpump.util.{Constants, LogUtil} - -/** - * - * TaskManager track all tasks's status. - * - * It is state machine with three states: - * 1. applicationReady - * 2. recovery - * 3. dynamicDag - * - * When in state applicationReady: - * 1. When there is message-loss or JVM crash, transit to state recovery. - * 2. When user modify the DAG, transit to dynamicDag. - * - * When in state recovery: - * 1. When all tasks has been recovered, transit to applicationReady. - * - * When in state dynamicDag: - * 1. When dynamic dag transition is complete, transit to applicationReady. - * 2. When there is message loss or JVM crash, transit to state recovery. - */ -private[appmaster] class TaskManager( - appId: Int, - dagManager: ActorRef, - jarScheduler: JarScheduler, - executorManager: ActorRef, - clockService: ActorRef, - appMaster: ActorRef, - appName: String) - extends Actor { - - private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) - private val systemConfig = context.system.settings.config - - private val ids = new SessionIdFactory() - - private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries = 5, - withinTimeRange = 20.seconds) - - private implicit val timeout = Constants.FUTURE_TIMEOUT - private implicit val actorSystem = context.system - - import context.dispatcher - - dagManager ! WatchChange(watcher = self) - executorManager ! SetTaskManager(self) - - private def getStartClock: Future[TimeStamp] = { - (clockService ? GetStartClock).asInstanceOf[Future[StartClock]].map(_.clock) - } - - private var startClock: Future[TimeStamp] = getStartClock - - def receive: Receive = applicationReady(DagReadyState.empty) - - private def onClientQuery(taskRegistry: TaskRegistry): Receive = { - case clock: ClockEvent => - clockService forward clock - case GetTaskList => - sender ! TaskList(taskRegistry.getTaskExecutorMap) - case LookupTaskActorRef(taskId) => - val executorId = taskRegistry.getExecutorId(taskId) - val requestor = sender - executorId.map { executorId => - val taskPath = ActorPathUtil.taskActorPath(appMaster, executorId, taskId) - context.actorSelection(taskPath).resolveOne(3.seconds).map { taskActorRef => - requestor ! TaskActorRef(taskActorRef) - } - } - } - - /** - * State applicationReady - */ - def applicationReady(state: DagReadyState): Receive = { - executorManager ! state.taskRegistry.usedResource - dagManager ! NewDAGDeployed(state.dag.version) - dagManager ! GetLatestDAG - LOG.info(s"goto state ApplicationReady(dag = ${state.dag.version})...") - - val recoverRegistry = new TaskRegistry(expectedTasks = state.dag.tasks, - deadTasks = state.taskRegistry.deadTasks) - - val recoverState = new StartDagState(state.dag, recoverRegistry) - - val onError: Receive = { - case executorStopped@ExecutorStopped(executorId) => - if (state.taskRegistry.isTaskRegisteredForExecutor(executorId)) { - self ! executorStopped - context.become(recovery(recoverState)) - } - case MessageLoss(executorId, taskId, cause) => - if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) && - executorRestartPolicy.allowRestartExecutor(executorId)) { - context.become(recovery(recoverState)) - } else { - val errorMsg = s"Task $taskId fails too many times to recover" - appMaster ! FailedToRecover(errorMsg) - } - case replay: ReplayFromTimestampWindowTrailingEdge => - LOG.error(s"Received $replay") - context.become(recovery(recoverState)) - } - - val onNewDag: Receive = { - case LatestDAG(newDag) => - - if (newDag.version > state.dag.version) { - - executorManager ! BroadCast(StartDynamicDag(newDag.version)) - LOG.info("Broadcasting StartDynamicDag") - - val dagDiff = migrate(state.dag, newDag) - jarScheduler.setDag(newDag, startClock) - val resourceRequestsDetails = jarScheduler.getResourceRequestDetails() - resourceRequestsDetails.map { details => - details.foreach { detail => - if (detail.requests.length > 0 && detail.requests.exists(!_.resource.isEmpty)) { - executorManager ! StartExecutors(detail.requests, detail.jar) - } - } - } - - var modifiedTasks = List.empty[TaskId] - for (processorId <- dagDiff.modifiedProcessors ++ dagDiff.impactedUpstream) { - val executors = state.taskRegistry.processorExecutors(processorId) - executors.foreach { pair => - val (executorId, tasks) = pair - modifiedTasks ++= tasks - dagManager ! GetTaskLaunchData(newDag.version, processorId, - ChangeTasksOnExecutor(executorId, tasks)) - } - } - - val taskChangeRegistry = new TaskChangeRegistry(modifiedTasks) - - val deadTasks = state.taskRegistry.deadTasks - val registeredTasks = state.taskRegistry.registeredTasks - val dynamicTaskRegistry = new TaskRegistry(newDag.tasks, registeredTasks, deadTasks) - - val nextState = new StartDagState(newDag, dynamicTaskRegistry, taskChangeRegistry) - context.become(dynamicDag(nextState, recoverState)) - } - } - - val onUnRegisterTask: Receive = { - case unRegister: UnRegisterTask => - - LOG.info(s"Received $unRegister, stop task ${unRegister.taskId}") - sender ! StopTask(unRegister.taskId) - - val taskId = unRegister.taskId - val registry = state.taskRegistry - val deadTasks = registry.deadTasks - - val newRegistry = registry.copy(registeredTasks = registry.registeredTasks - taskId, - deadTasks = deadTasks + taskId) - - val newState = new DagReadyState(state.dag, newRegistry) - context.become(applicationReady(newState)) - } - - // Recovers to same version - onClientQuery(state.taskRegistry) orElse onError orElse onNewDag orElse - onUnRegisterTask orElse unHandled("applicationReady") - } - - /** - * State dynamicDag - */ - def dynamicDag(state: StartDagState, recoverState: StartDagState): Receive = { - LOG.info(s"DynamicDag transit to dag version: ${state.dag.version}...") - - val onMessageLoss: Receive = { - case executorStopped@ExecutorStopped(executorId) => - context.become(recovery(recoverState)) - case MessageLoss(executorId, taskId, cause) => - if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) && - executorRestartPolicy.allowRestartExecutor(executorId)) { - context.become(recovery(recoverState)) - } else { - val errorMsg = s"Task $taskId fails too many times to recover" - appMaster ! FailedToRecover(errorMsg) - } - } - - onMessageLoss orElse onClientQuery(state.taskRegistry) orElse - startDag(state, recoverState) orElse unHandled("dynamicDag") - } - - private def startDag(state: StartDagState, recoverState: StartDagState): Receive = { - case executor: ExecutorStarted => - import executor.{boundedJar, executorId, resource, workerId} - val taskIdsFuture = jarScheduler.scheduleTask(boundedJar.get, workerId, executorId, resource) - taskIdsFuture.foreach { taskIds => - LOG.info(s"Executor $executor has been started, " + - s"start to schedule tasks: ${taskIds.mkString(",")}") - taskIds.groupBy(_.processorId).foreach { pair => - val (processorId, tasks) = pair - dagManager ! GetTaskLaunchData(state.dag.version, processorId, - StartTasksOnExecutor(executor.executorId, tasks)) - } - } - - case StartExecutorsTimeOut => - appMaster ! AllocateResourceTimeOut - case TaskLaunchData(processorDescription, subscribers, command) => - command match { - case StartTasksOnExecutor(executorId, tasks) => - LOG.info(s"Start tasks on Executor($executorId), tasks: " + tasks) - val launchTasks = LaunchTasks(tasks, state.dag.version, processorDescription, subscribers) - executorManager ! UniCast(executorId, launchTasks) - tasks.foreach(executorRestartPolicy.addTaskToExecutor(executorId, _)) - case ChangeTasksOnExecutor(executorId, tasks) => - LOG.info("change Task on executor: " + executorId + ", tasks: " + tasks) - val changeTasks = ChangeTasks(tasks, state.dag.version, processorDescription.life, - subscribers) - executorManager ! UniCast(executorId, changeTasks) - case other => - LOG.error(s"severe error! we expect ExecutorStarted but get ${other.getClass.toString}") - } - case TasksLaunched => - // We will track all launched task by message RegisterTask - case TasksChanged(tasks) => - tasks.foreach(task => state.taskChangeRegistry.taskChanged(task)) - - if (allTasksReady(state)) { - broadcastLocations(state) - } - - case RegisterTask(taskId, executorId, host) => - val client = sender - val register = state.taskRegistry - val status = register.registerTask(taskId, TaskLocation(executorId, host)) - if (status == Accept) { - LOG.info(s"RegisterTask($taskId) TaskLocation: $host, Executor: $executorId") - val sessionId = ids.newSessionId - - startClock.foreach(clock => client ! TaskRegistered(taskId, sessionId, clock)) - if (allTasksReady(state)) { - broadcastLocations(state) - } - } else { - sender ! TaskRejected(taskId) - } - - case TaskChanged(taskId, dagVersion) => - state.taskChangeRegistry.taskChanged(taskId) - if (allTasksReady(state)) { - broadcastLocations(state) - } - case locationReceived: TaskLocationsReceived => - state.executorReadyRegistry.registerExecutor(locationReceived.executorId) - if (allTasksReady(state) && - state.executorReadyRegistry.allRegistered(state.taskRegistry.executors)) { - LOG.info("All executors are ready to start...") - clockService ! ChangeToNewDAG(state.dag) - } - case locationRejected: TaskLocationsRejected => - LOG.error(s"received $locationRejected, start to recover") - context.become(recovery(recoverState)) - - case ChangeToNewDAGSuccess(_) => - if (allTasksReady(state) && - state.executorReadyRegistry.allRegistered(state.taskRegistry.executors)) { - executorManager ! BroadCast(StartAllTasks(state.dag.version)) - context.become(applicationReady(new DagReadyState(state.dag, state.taskRegistry))) - } - } - - def onExecutorError: Receive = { - case ExecutorStopped(executorId) => - if (executorRestartPolicy.allowRestartExecutor(executorId)) { - jarScheduler.executorFailed(executorId).foreach { resourceRequestDetail => - if (resourceRequestDetail.isDefined) { - executorManager ! StartExecutors(resourceRequestDetail.get.requests, - resourceRequestDetail.get.jar) - } - } - } else { - val errorMsg = s"Executor restarted too many times to recover" - appMaster ! FailedToRecover(errorMsg) - } - } - - private def allTasksReady(state: StartDagState): Boolean = { - import state.{taskChangeRegistry, taskRegistry} - taskRegistry.isAllTasksRegistered && taskChangeRegistry.allTaskChanged - } - - private def broadcastLocations(state: StartDagState): Unit = { - LOG.info(s"All tasks have been launched; send Task locations to all executors") - val taskLocations = state.taskRegistry.getTaskLocations - executorManager ! BroadCast(TaskLocationsReady(taskLocations, state.dag.version)) - } - - /** - * State recovery - */ - def recovery(state: StartDagState): Receive = { - val recoverDagVersion = state.dag.version - executorManager ! BroadCast(RestartTasks(recoverDagVersion)) - - // Use new Start Clock so that we recover at timepoint we fails. - startClock = getStartClock - - jarScheduler.setDag(state.dag, startClock) - - LOG.info(s"goto state Recovery(recoverDag = $recoverDagVersion)...") - val ignoreClock: Receive = { - case clock: ClockEvent => - // Ignores clock events. - } - - if (state.dag.isEmpty) { - applicationReady(new DagReadyState(state.dag, state.taskRegistry)) - } else { - val registry = new TaskRegistry(expectedTasks = state.dag.tasks, - deadTasks = state.taskRegistry.deadTasks) - - val recoverState = new StartDagState(state.dag, registry) - ignoreClock orElse startDag(state, recoverState) orElse onExecutorError orElse - unHandled("recovery") - } - } - - private def unHandled(state: String): Receive = { - case other => - LOG.info(s"Received unknown message $other in state $state") - } -} - -private[appmaster] object TaskManager { - - /** - * When application is ready, then transit to DagReadyState - */ - class DagReadyState(val dag: DAG, val taskRegistry: TaskRegistry) - - object DagReadyState { - def empty: DagReadyState = { - new DagReadyState( - DAG.empty.copy(version = -1), - new TaskRegistry(List.empty[TaskId])) - } - } - - /** - * When application is booting up or doing recovery, it use StartDagState - */ - class StartDagState( - val dag: DAG, - val taskRegistry: TaskRegistry, - val taskChangeRegistry: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId]), - val executorReadyRegistry: ExecutorRegistry = new ExecutorRegistry) - - case object GetTaskList - - case class TaskList(tasks: Map[TaskId, ExecutorId]) - - case class FailedToRecover(errorMsg: String) - - /** - * Starts new Tasks on Executor executorId - */ - case class StartTasksOnExecutor(executorId: Int, tasks: List[TaskId]) - - /** - * Changes existing tasks on executor executorId - */ - case class ChangeTasksOnExecutor(executorId: Int, tasks: List[TaskId]) - - /** - * Tracks the registration of all new started executors. - */ - class ExecutorRegistry { - private var registeredExecutors = Set.empty[ExecutorId] - - def registerExecutor(executorId: ExecutorId): Unit = { - registeredExecutors += executorId - } - - def allRegistered(all: List[ExecutorId]): Boolean = { - all.forall(executor => registeredExecutors.contains(executor)) - } - } - - /** - * Tracks the registration of all changed tasks. - */ - class TaskChangeRegistry(targetTasks: List[TaskId]) { - private var registeredTasks = Set.empty[TaskId] - def taskChanged(taskId: TaskId): Unit = { - registeredTasks += taskId - } - def allTaskChanged: Boolean = { - targetTasks.forall(taskId => registeredTasks.contains(taskId)) - } - } - - object TaskChangeRegistry { - def empty: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId]) - } - - /** - * DAGDiff is used to track impacted processors when doing dynamic dag. - */ - case class DAGDiff( - addedProcessors: List[ProcessorId], - modifiedProcessors: List[ProcessorId], - impactedUpstream: List[ProcessorId]) - - /** - * Migrates from old DAG to new DAG, return DAGDiff - */ - def migrate(leftDAG: DAG, rightDAG: DAG): DAGDiff = { - val left = leftDAG.processors.keySet - val right = rightDAG.processors.keySet - - val added = right -- left - val join = right -- added - - val modified = join.filter { processorId => - leftDAG.processors(processorId) != rightDAG.processors(processorId) - } - - val upstream = (list: Set[ProcessorId]) => { - list.flatMap { processorId => - rightDAG.graph.incomingEdgesOf(processorId).map(_._1).toSet - } -- list - } - - val impactedUpstream = upstream(added ++ modified) - - // All upstream tasks are affected, and should be handled properly. - DAGDiff(added.toList, modified.toList, impactedUpstream.toList) - } - - /** - * Each new task will be assigned with a unique session Id. - */ - class SessionIdFactory { - private var nextSessionId = 1 - - /** - * Returns a new session Id for new task - */ - final def newSessionId: Int = { - val sessionId = nextSessionId - nextSessionId += 1 - sessionId - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala deleted file mode 100644 index adfdeba..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import org.slf4j.Logger - -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.streaming.appmaster.ExecutorManager.ExecutorResourceUsageSummary -import io.gearpump.streaming.appmaster.TaskRegistry._ -import io.gearpump.streaming.task.TaskId -import io.gearpump.streaming.{ExecutorId, ProcessorId} -import io.gearpump.transport.HostPort -import io.gearpump.util.LogUtil - -/** - * Tracks the registration of all tasks, when the application is booting up. - */ -class TaskRegistry(val expectedTasks: List[TaskId], - var registeredTasks: Map[TaskId, TaskLocation] = Map.empty[TaskId, TaskLocation], - var deadTasks: Set[TaskId] = Set.empty[TaskId]) { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - private val processors = expectedTasks.map(_.processorId).toSet - - /** - * When a task is booted, it need to call registerTask to register itself. - * If this task is valid, then accept it, otherwise reject it. - * - * @param taskId Task that register itself to TaskRegistry. - * @param location The host and port where this task is running on. NOTE: The host and port - * is NOT the same host and port of Akka remoting. Instead, it is host and port - * of custom netty layer, see [[io.gearpump.transport.netty.Context]]. - */ - def registerTask(taskId: TaskId, location: TaskLocation): RegisterTaskStatus = { - val processorId = taskId.processorId - - if (processors.contains(processorId)) { - registeredTasks += taskId -> location - Accept - } else { - LOG.error(s" the task is not accepted for registration, taskId: ${taskId}") - Reject - } - } - - def copy(expectedTasks: List[TaskId] = this.expectedTasks, - registeredTasks: Map[TaskId, TaskLocation] = this.registeredTasks, - deadTasks: Set[TaskId] = this.deadTasks): TaskRegistry = { - new TaskRegistry(expectedTasks, registeredTasks, deadTasks) - } - - def getTaskLocations: TaskLocations = { - val taskLocations = registeredTasks.toList.groupBy(_._2.host).map { pair => - val (k, v) = pair - val taskIds = v.map(_._1) - (k, taskIds.toSet) - } - TaskLocations(taskLocations) - } - - def getTaskExecutorMap: Map[TaskId, ExecutorId] = { - getTaskLocations.locations.flatMap { pair => - val (hostPort, taskSet) = pair - taskSet.map { taskId => - (taskId, getExecutorId(taskId).getOrElse(-1)) - } - } - } - - /** Query the executor Id where the task is running on */ - def getExecutorId(taskId: TaskId): Option[Int] = { - registeredTasks.get(taskId).map(_.executorId) - } - - /** Gets list of allocated executor Ids */ - def executors: List[ExecutorId] = { - registeredTasks.toList.map(_._2.executorId) - } - - def isAllTasksRegistered: Boolean = { - val aliveTasks = (expectedTasks.toSet -- deadTasks) - aliveTasks.forall(task => registeredTasks.contains(task)) - } - - def isTaskRegisteredForExecutor(executorId: ExecutorId): Boolean = { - registeredTasks.exists(_._2.executorId == executorId) - } - - private def filterTasks(processorId: ProcessorId): List[TaskId] = { - registeredTasks.keys.toList.filter(_.processorId == processorId) - } - - /** List of executors that current processor taks are running on */ - def processorExecutors(processorId: ProcessorId): Map[ExecutorId, List[TaskId]] = { - val taskToExecutor = filterTasks(processorId).flatMap { taskId => - getExecutorId(taskId).map { executorId => - (taskId, executorId) - } - } - - val executorToTasks = taskToExecutor.groupBy(_._2).map { kv => - val (k, v) = kv - (k, v.map(_._1)) - } - executorToTasks - } - - /** Summary about how many resources are used for all running tasks */ - def usedResource: ExecutorResourceUsageSummary = { - val resourceMap = registeredTasks.foldLeft(Map.empty[ExecutorId, Resource]) { (map, task) => - val resource = map.getOrElse(task._2.executorId, Resource(0)) + Resource(1) - map + (task._2.executorId -> resource) - } - ExecutorResourceUsageSummary(resourceMap) - } -} - -object TaskRegistry { - sealed trait RegisterTaskStatus - case object Accept extends RegisterTaskStatus - case object Reject extends RegisterTaskStatus - - case class TaskLocation(executorId: Int, host: HostPort) - - case class TaskLocations(locations: Map[HostPort, Set[TaskId]]) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala deleted file mode 100644 index 62dff6c..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.appmaster - -import com.typesafe.config.Config - -import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.streaming.DAG -import io.gearpump.streaming.appmaster.TaskLocator.{Locality, WorkerLocality} -import io.gearpump.streaming.appmaster.TaskScheduler.{Location, TaskStatus} -import io.gearpump.streaming.task.TaskId -import io.gearpump.util.Constants - -/** - * Schedules tasks to run for new allocated resources. TaskScheduler only schedule tasks that - * share the same jar. For scheduling for multiple jars, see - * [[io.gearpump.streaming.appmaster.JarScheduler]]. - */ -trait TaskScheduler { - - /** - * This notify the scheduler that the task DAG is created. - * @param dag task dag - */ - def setDAG(dag: DAG): Unit - - /** - * Get the resource requirements for all unscheduled tasks. - */ - def getResourceRequests(): Array[ResourceRequest] - - /** - * This notifies the scheduler that a resource slot on {workerId} and {executorId} is allocated - * , and expect a task to be scheduled in return. - * Task locality should be considered when deciding whether to offer a task on target {worker} - * and {executor} - * - * @param workerId which worker this resource is located. - * @param executorId which executorId this resource belongs to. - * @return a list of tasks - */ - def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId] - - /** - * This notifies the scheduler that {executorId} is failed, and expect a set of - * ResourceRequest for all failed tasks on that executor. - * - * @param executorId executor that failed - * @return resource requests of the failed executor - */ - def executorFailed(executorId: Int): Array[ResourceRequest] - - /** - * Queries the task list that already scheduled on the executor - * - * @param executorId executor to query - * @return a list of tasks - */ - def scheduledTasks(executorId: Int): List[TaskId] -} - -object TaskScheduler { - case class Location(workerId: WorkerId, executorId: Int) - - class TaskStatus(val taskId: TaskId, val preferLocality: Locality, var allocation: Location) -} - -class TaskSchedulerImpl(appId: Int, appName: String, config: Config) extends TaskScheduler { - private val executorNum = config.getInt(Constants.APPLICATION_EXECUTOR_NUMBER) - - private var tasks = List.empty[TaskStatus] - - // Finds the locality of the tasks - private val taskLocator = new TaskLocator(appName, config) - - override def setDAG(dag: DAG): Unit = { - val taskMap = tasks.map(_.taskId).zip(tasks).toMap - - tasks = dag.tasks.sortBy(_.index).map { taskId => - val locality = taskLocator.locateTask(taskId) - taskMap.getOrElse(taskId, new TaskStatus(taskId, locality, allocation = null)) - } - } - - def getResourceRequests(): Array[ResourceRequest] = { - fetchResourceRequests(fromOneWorker = false) - } - - import io.gearpump.cluster.scheduler.Relaxation._ - private def fetchResourceRequests(fromOneWorker: Boolean = false): Array[ResourceRequest] = { - var workersResourceRequest = Map.empty[WorkerId, Resource] - - tasks.filter(_.allocation == null).foreach { task => - task.preferLocality match { - case WorkerLocality(workerId) => - val current = workersResourceRequest.getOrElse(workerId, Resource.empty) - workersResourceRequest += workerId -> (current + Resource(1)) - case _ => - val workerId = WorkerId.unspecified - val current = workersResourceRequest.getOrElse(workerId, Resource.empty) - workersResourceRequest += workerId -> (current + Resource(1)) - } - } - - workersResourceRequest.map { workerIdAndResource => - val (workerId, resource) = workerIdAndResource - if (workerId == WorkerId.unspecified) { - ResourceRequest(resource, workerId = WorkerId.unspecified, executorNum = executorNum) - } else { - ResourceRequest(resource, workerId, relaxation = SPECIFICWORKER) - } - }.toArray - } - - override def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId] = { - var scheduledTasks = List.empty[TaskId] - val location = Location(workerId, executorId) - // Schedules tasks for specific worker - scheduledTasks ++= scheduleTasksForLocality(resource, location, - (locality) => locality == WorkerLocality(workerId)) - - // Schedules tasks without specific location preference - scheduledTasks ++= scheduleTasksForLocality(resource - Resource(scheduledTasks.length), - location, (locality) => true) - scheduledTasks - } - - private def scheduleTasksForLocality( - resource: Resource, resourceLocation: Location, matcher: (Locality) => Boolean) - : List[TaskId] = { - var scheduledTasks = List.empty[TaskId] - var index = 0 - var remain = resource.slots - while (index < tasks.length && remain > 0) { - val taskStatus = tasks(index) - if (taskStatus.allocation == null && matcher(taskStatus.preferLocality)) { - taskStatus.allocation = resourceLocation - scheduledTasks +:= taskStatus.taskId - remain -= 1 - } - index += 1 - } - scheduledTasks - } - - override def executorFailed(executorId: Int): Array[ResourceRequest] = { - val failedTasks = tasks.filter { status => - status.allocation != null && status.allocation.executorId == executorId - } - // Cleans the location of failed tasks - failedTasks.foreach(_.allocation = null) - - Array(ResourceRequest(Resource(failedTasks.length), - workerId = WorkerId.unspecified, relaxation = ONEWORKER)) - } - - override def scheduledTasks(executorId: Int): List[TaskId] = { - tasks.filter { status => - status.allocation != null && status.allocation.executorId == executorId - }.map(_.taskId) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala deleted file mode 100644 index ddd2037..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.dsl - -import scala.language.implicitConversions - -import org.slf4j.{Logger, LoggerFactory} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.dsl.op._ -import io.gearpump.streaming.sink.DataSink -import io.gearpump.streaming.task.{Task, TaskContext} -import io.gearpump.util.Graph - -class Stream[T]( - private val graph: Graph[Op, OpEdge], private val thisNode: Op, - private val edge: Option[OpEdge] = None) { - - /** - * converts a value[T] to a list of value[R] - * - * @param fun FlatMap function - * @param description The description message for this operation - * @return A new stream with type [R] - */ - def flatMap[R](fun: T => TraversableOnce[R], description: String = null): Stream[R] = { - val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap")) - graph.addVertex(flatMapOp) - graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp) - new Stream[R](graph, flatMapOp) - } - - /** - * Maps message of type T message of type R - * - * @param fun Function - * @return A new stream with type [R] - */ - def map[R](fun: T => R, description: String = null): Stream[R] = { - this.flatMap({ data => - Option(fun(data)) - }, Option(description).getOrElse("map")) - } - - /** - * Keeps records when fun(T) == true - * - * @param fun the filter - * @return a new stream after filter - */ - def filter(fun: T => Boolean, description: String = null): Stream[T] = { - this.flatMap({ data => - if (fun(data)) Option(data) else None - }, Option(description).getOrElse("filter")) - } - - /** - * Reduces operations. - * - * @param fun reduction function - * @param description description message for this operator - * @return a new stream after reduction - */ - def reduce(fun: (T, T) => T, description: String = null): Stream[T] = { - val reduceOp = ReduceOp(fun, Option(description).getOrElse("reduce")) - graph.addVertex(reduceOp) - graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp) - new Stream(graph, reduceOp) - } - - /** - * Log to task log file - */ - def log(): Unit = { - this.map(msg => LoggerFactory.getLogger("dsl").info(msg.toString), "log") - } - - /** - * Merges data from two stream into one - * - * @param other the other stream - * @return the merged stream - */ - def merge(other: Stream[T], description: String = null): Stream[T] = { - val mergeOp = MergeOp(Option(description).getOrElse("merge")) - graph.addVertex(mergeOp) - graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp) - graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp) - new Stream[T](graph, mergeOp) - } - - /** - * Group by function (T => Group) - * - * For example, we have T type, People(name: String, gender: String, age: Int) - * groupBy[People](_.gender) will group the people by gender. - * - * You can append other combinators after groupBy - * - * For example, - * {{{ - * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..) - * }}} - * - * @param fun Group by function - * @param parallelism Parallelism level - * @param description The description - * @return the grouped stream - */ - def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null) - : Stream[T] = { - val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy")) - graph.addVertex(groupOp) - graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) - new Stream[T](graph, groupOp) - } - - /** - * Connects with a low level Processor(TaskDescription) - * - * @param processor a user defined processor - * @param parallelism parallelism level - * @return new stream after processing with type [R] - */ - def process[R]( - processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty, - description: String = null): Stream[R] = { - val processorOp = ProcessorOp(processor, parallelism, conf, - Option(description).getOrElse("process")) - graph.addVertex(processorOp) - graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp) - new Stream[R](graph, processorOp, Some(Shuffle)) - } -} - -class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { - /** - * GroupBy key - * - * Applies to Stream[Tuple2[K,V]] - * - * @param parallelism the parallelism for this operation - * @return the new KV stream - */ - def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = { - stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey") - } - - /** - * Sum the value of the tuples - * - * Apply to Stream[Tuple2[K,V]], V must be of type Number - * - * For input (key, value1), (key, value2), will generate (key, value1 + value2) - * @param numeric the numeric operations - * @return the sum stream - */ - def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = { - stream.reduce(Stream.sumByValue[K, V](numeric), "sum") - } -} - -object Stream { - - def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = { - new Stream[T](graph, node, edge) - } - - def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 - - def sumByValue[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] - = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) - - implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = { - new KVStream(stream) - } - - implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable { - def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, description: String) - : Stream[T] = { - implicit val sink = DataSinkOp[T](dataSink, parallism, conf, - Some(description).getOrElse("traversable")) - stream.graph.addVertex(sink) - stream.graph.addEdge(stream.thisNode, Shuffle, sink) - new Stream[T](stream.graph, sink) - } - - def sink[T]( - sink: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty, - description: String = null): Stream[T] = { - val sinkOp = ProcessorOp(sink, parallism, conf, Option(description).getOrElse("source")) - stream.graph.addVertex(sinkOp) - stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp) - new Stream[T](stream.graph, sinkOp) - } - } -} - -class LoggerSink[T] extends DataSink { - var logger: Logger = null - - private var context: TaskContext = null - - override def open(context: TaskContext): Unit = { - this.logger = context.logger - } - - override def write(message: Message): Unit = { - logger.info("logging message " + message.msg) - } - - override def close(): Unit = Unit -} \ No newline at end of file
