http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ExecutorManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ExecutorManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ExecutorManager.scala new file mode 100644 index 0000000..79a65c4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ExecutorManager.scala @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import scala.concurrent.duration._ +import scala.util.{Failure, Try} + +import akka.actor.SupervisorStrategy.Stop +import akka.actor._ +import akka.remote.RemoteScope +import com.typesafe.config.Config +import org.apache.commons.lang.exception.ExceptionUtils + +import org.apache.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource +import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems} +import org.apache.gearpump.cluster.appmaster.WorkerInfo +import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.cluster.{AppJar, AppMasterContext, ExecutorContext, UserConfig} +import org.apache.gearpump.streaming.ExecutorId +import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor +import org.apache.gearpump.streaming.appmaster.ExecutorManager._ +import org.apache.gearpump.streaming.executor.Executor +import org.apache.gearpump.util.{LogUtil, Util} + +/** + * ExecutorManager manage the start and stop of all executors. + * + * ExecutorManager will launch Executor when asked. It hide the details like starting + * a new ExecutorSystem from user. Please use ExecutorManager.props() to construct this actor + */ +private[appmaster] class ExecutorManager( + userConfig: UserConfig, + appContext: AppMasterContext, + executorFactory: (ExecutorContext, UserConfig, Address, ExecutorId) => Props, + clusterConfig: Config, + appName: String) + extends Actor { + + private val LOG = LogUtil.getLogger(getClass) + + import appContext.{appId, masterProxy, username} + + private var taskManager: ActorRef = null + + private implicit val actorSystem = context.system + private val systemConfig = context.system.settings.config + + private var executors = Map.empty[Int, ExecutorInfo] + + def receive: Receive = waitForTaskManager + + def waitForTaskManager: Receive = { + case SetTaskManager(taskManager) => + this.taskManager = taskManager + context.become(service orElse terminationWatch) + } + + // If something wrong on executor, ExecutorManager will stop the current executor, + // and wait for AppMaster to start a new executor. + override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, + withinTimeRange = 1.minute) { + case ex: Throwable => + val executorId = Try(sender.path.name.toInt) + executorId match { + case scala.util.Success(id) => { + executors -= id + LOG.error(s"Executor $id throws exception, stop it...\n" + + ExceptionUtils.getStackTrace(ex)) + } + case Failure(ex) => { + LOG.error(s"Sender ${sender.path} is dead, but seems it is not an executor...") + } + } + Stop + } + + // Responds to outside queries + def service: Receive = { + case StartExecutors(resources, jar) => + masterProxy ! StartExecutorSystems(resources, getExecutorJvmConfig(Some(jar))) + case ExecutorSystemStarted(executorSystem, boundedJar) => + import executorSystem.{address, executorSystemId, resource => executorResource, worker} + + val executorId = executorSystemId + val executorContext = ExecutorContext(executorId, worker, appId, appName, + appMaster = context.parent, executorResource) + executors += executorId -> ExecutorInfo(executorId, null, worker, boundedJar) + + // Starts executor + val executor = context.actorOf(executorFactory(executorContext, userConfig, + address, executorId), executorId.toString) + executorSystem.bindLifeCycleWith(executor) + case StartExecutorSystemTimeout => + taskManager ! StartExecutorsTimeOut + + case RegisterExecutor(executor, executorId, resource, worker) => + LOG.info(s"executor $executorId has been launched") + // Watches for executor termination + context.watch(executor) + val executorInfo = executors.get(executorId).get + executors += executorId -> executorInfo.copy(executor = executor) + taskManager ! ExecutorStarted(executorId, resource, worker.workerId, executorInfo.boundedJar) + + // Broadcasts message to all executors + case BroadCast(msg) => + LOG.info(s"Broadcast ${msg.getClass.getSimpleName} to all executors") + context.children.foreach(_ forward msg) + + // Unicasts message to single executor + case UniCast(executorId, msg) => + LOG.info(s"Unicast ${msg.getClass.getSimpleName} to executor $executorId") + val executor = executors.get(executorId) + executor.foreach(_.executor forward msg) + + case GetExecutorInfo => + sender ! executors + + // Tells Executor manager resources that are occupied. The Executor Manager can use this + // information to tell worker to reclaim un-used resources + case ExecutorResourceUsageSummary(resources) => + executors.foreach { pair => + val (executorId, executor) = pair + val resource = resources.get(executorId) + val worker = executor.worker.ref + // Notifies the worker the actual resource used by this application. + resource match { + case Some(resource) => + worker ! ChangeExecutorResource(appId, executorId, resource) + case None => + worker ! ChangeExecutorResource(appId, executorId, Resource(0)) + } + } + } + + def terminationWatch: Receive = { + case Terminated(actor) => + val executorId = Try(actor.path.name.toInt) + executorId match { + case scala.util.Success(id) => { + executors -= id + LOG.error(s"Executor $id is down") + taskManager ! ExecutorStopped(id) + } + case scala.util.Failure(ex) => + LOG.error(s"failed to get the executor Id from path string ${actor.path}", ex) + } + } + + private def getExecutorJvmConfig(jar: Option[AppJar]): ExecutorSystemJvmConfig = { + val executorAkkaConfig = clusterConfig + val jvmSetting = Util.resolveJvmSetting(executorAkkaConfig.withFallback(systemConfig)).executor + + ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, jar, + username, executorAkkaConfig) + } +} + +private[appmaster] object ExecutorManager { + case class StartExecutors(resources: Array[ResourceRequest], jar: AppJar) + case class BroadCast(msg: Any) + + case class UniCast(executorId: Int, msg: Any) + + case object GetExecutorInfo + + case class ExecutorStarted( + executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar]) + case class ExecutorStopped(executorId: Int) + + case class SetTaskManager(taskManager: ActorRef) + + case object StartExecutorsTimeOut + + def props( + userConfig: UserConfig, appContext: AppMasterContext, clusterConfig: Config, appName: String) + : Props = { + val executorFactory = + (executorContext: ExecutorContext, + userConfig: UserConfig, + address: Address, + executorId: ExecutorId) => + Props(classOf[Executor], executorContext, userConfig) + .withDeploy(Deploy(scope = RemoteScope(address))) + + Props(new ExecutorManager(userConfig, appContext, executorFactory, clusterConfig, appName)) + } + + case class ExecutorResourceUsageSummary(resources: Map[ExecutorId, Resource]) + + case class ExecutorInfo( + executorId: ExecutorId, executor: ActorRef, worker: WorkerInfo, boundedJar: Option[AppJar]) +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala new file mode 100644 index 0000000..6de5306 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.appmaster + +import akka.actor._ +import akka.pattern.ask +import com.typesafe.config.Config +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.cluster.AppJar +import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.appmaster.JarScheduler._ +import org.apache.gearpump.streaming.task.TaskId +import org.apache.gearpump.streaming.{DAG, ProcessorDescription} +import org.apache.gearpump.util.{Constants, Graph, LogUtil} + +import scala.concurrent.Future + +/** + * Different processors of the stream application can use different jars. JarScheduler is the + * scheduler for different jars. + * + * For a DAG of multiple processors, each processor can have its own jar. Tasks of same jar + * is scheduled by TaskScheduler, and TaskSchedulers are scheduled by JarScheduler. + * + * In runtime, the implementation is delegated to actor JarSchedulerImpl + */ +class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRefFactory) { + private val actor: ActorRef = factory.actorOf(Props(new JarSchedulerImpl(appId, appName, config))) + private implicit val dispatcher = factory.dispatcher + private implicit val timeout = Constants.FUTURE_TIMEOUT + + /** Set the current DAG version active */ + def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = { + actor ! TransitToNewDag + startClock.map { start => + actor ! NewDag(dag, start) + } + } + + /** AppMaster ask JarScheduler about how many resource it wants */ + def getResourceRequestDetails(): Future[Array[ResourceRequestDetail]] = { + (actor ? GetResourceRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]] + } + + /** + * AppMaster has resource allocated, and ask the jar scheduler to schedule tasks + * for this executor. + */ + def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource) + : Future[List[TaskId]] = { + (actor ? ScheduleTask(appJar, workerId, executorId, resource)) + .asInstanceOf[Future[List[TaskId]]] + } + + /** + * Some executor JVM process is dead. AppMaster asks jar scheduler to re-schedule the impacted + * tasks. + */ + def executorFailed(executorId: Int): Future[Option[ResourceRequestDetail]] = { + (actor ? ExecutorFailed(executorId)).asInstanceOf[Future[Option[ResourceRequestDetail]]] + } +} + +object JarScheduler { + + case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest]) + + case class NewDag(dag: DAG, startTime: TimeStamp) + + case object TransitToNewDag + + case object GetResourceRequestDetails + + /** + * Schedule tasks for one appJar. + * + * @param appJar Application jar. + * @param workerId Worker machine Id. + * @param executorId Executor Id. + * @param resource Slots that are available. + */ + case class ScheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource) + + /** Some executor JVM is dead, try to recover tasks that are located on failed executor */ + case class ExecutorFailed(executorId: Int) + + class JarSchedulerImpl(appId: Int, appName: String, config: Config) extends Actor with Stash { + + // Each TaskScheduler maps to a jar. + private var taskSchedulers = Map.empty[AppJar, TaskScheduler] + + private val LOG = LogUtil.getLogger(getClass) + + def receive: Receive = waitForNewDag + + def waitForNewDag: Receive = { + case TransitToNewDag => // Continue current state + case NewDag(dag, startTime) => + + LOG.info(s"Init JarScheduler, dag version: ${dag.version}, startTime: $startTime") + + val processors = dag.processors.values.groupBy(_.jar) + + taskSchedulers = processors.map { jarAndProcessors => + val (jar, processors) = jarAndProcessors + // Construct the sub DAG, each sub DAG maps to a separate jar. + val subGraph = Graph.empty[ProcessorDescription, PartitionerDescription] + processors.foreach { processor => + if (startTime < processor.life.death) { + subGraph.addVertex(processor) + } + } + val subDagForSingleJar = DAG(subGraph) + + val taskScheduler = taskSchedulers + .getOrElse(jar, new TaskSchedulerImpl(appId, appName, config)) + + LOG.info(s"Set DAG for TaskScheduler, count: " + subDagForSingleJar.processors.size) + taskScheduler.setDAG(subDagForSingleJar) + jar -> taskScheduler + } + unstashAll() + context.become(ready) + case other => + stash() + } + + def ready: Receive = { + // Notifies there is a new DAG coming. + case TransitToNewDag => + context.become(waitForNewDag) + + case GetResourceRequestDetails => + + // Asks each TaskScheduler (Each for one jar) the resource requests. + val result: Array[ResourceRequestDetail] = taskSchedulers.map { jarAndScheduler => + val (jar, scheduler) = jarAndScheduler + ResourceRequestDetail(jar, scheduler.getResourceRequests()) + }.toArray + LOG.info(s"GetRequestDetails " + result.mkString(";")) + sender ! result + + case ScheduleTask(appJar, workerId, executorId, resource) => + val result: List[TaskId] = taskSchedulers.get(appJar).map { scheduler => + scheduler.schedule(workerId, executorId, resource) + }.getOrElse(List.empty) + LOG.info(s"ScheduleTask " + result.mkString(";")) + sender ! result + case ExecutorFailed(executorId) => + val result: Option[ResourceRequestDetail] = taskSchedulers. + find(_._2.scheduledTasks(executorId).nonEmpty).map { jarAndScheduler => + ResourceRequestDetail(jarAndScheduler._1, jarAndScheduler._2.executorFailed(executorId)) + } + LOG.info(s"ExecutorFailed " + result.mkString(";")) + sender ! result + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala new file mode 100644 index 0000000..3d43ee7 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import org.apache.gearpump._ +import org.apache.gearpump.cluster.AppMasterToMaster.AppMasterSummary +import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterStatus +import org.apache.gearpump.cluster.{MasterToAppMaster, UserConfig} +import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief +import org.apache.gearpump.streaming.{ExecutorId, LifeTime, ProcessorId} +import org.apache.gearpump.util.Graph +import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig + +/** Stream application summary, used for REST API */ +case class StreamAppMasterSummary( + appType: String = "streaming", + appId: Int, + appName: String = null, + actorPath: String = null, + clock: TimeStamp = 0L, + status: AppMasterStatus = MasterToAppMaster.AppMasterActive, + startTime: TimeStamp = 0L, + uptime: TimeStamp = 0L, + user: String = null, + homeDirectory: String = "", + logFile: String = "", + dag: Graph[ProcessorId, String] = null, + executors: List[ExecutorBrief] = null, + processors: Map[ProcessorId, ProcessorSummary] = Map.empty[ProcessorId, ProcessorSummary], + // Hiearachy level for each processor + processorLevels: Map[ProcessorId, Int] = Map.empty[ProcessorId, Int], + historyMetricsConfig: HistoryMetricsConfig = null) + extends AppMasterSummary + +case class TaskCount(count: Int) + +case class ProcessorSummary( + id: ProcessorId, + taskClass: String, + parallelism: Int, + description: String, + taskConf: UserConfig, + life: LifeTime, + executors: List[ExecutorId], + taskCount: Map[ExecutorId, TaskCount]) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskLocator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskLocator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskLocator.scala new file mode 100644 index 0000000..87630b7 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskLocator.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.appmaster + +import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions, ConfigValueFactory} +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.streaming.appmaster.TaskLocator.{Localities, Locality, NonLocality, WorkerLocality} +import org.apache.gearpump.streaming.task.TaskId + +import scala.collection.JavaConverters._ +import scala.util.Try + +/** + * TaskLocator is used to decide which machine one task should run on. + * + * User can config [[org.apache.gearpump.streaming.Constants#GEARPUMP_STREAMING_LOCALITIES]] to + * decide to control which machine the task is running on. + */ +class TaskLocator(appName: String, config: Config) { + private val taskLocalities: Map[TaskId, Locality] = loadTaskLocalities(config) + + /** Finds where a task should belongs to */ + def locateTask(taskId: TaskId): Locality = { + taskLocalities.getOrElse(taskId, NonLocality) + } + + private def loadTaskLocalities(config: Config): Map[TaskId, Locality] = { + import org.apache.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES + Try(config.getConfig(s"$GEARPUMP_STREAMING_LOCALITIES.$appName")).map { appConfig => + val json = appConfig.root().render(ConfigRenderOptions.concise) + Localities.fromJson(json) + }.map { localityConfig => + import localityConfig.localities + localities.keySet.flatMap { workerId => + val tasks = localities(workerId) + tasks.map((_, WorkerLocality(workerId))) + }.toArray.toMap + }.getOrElse(Map.empty[TaskId, Locality]) + } +} + +object TaskLocator { + + trait Locality + + /** Means we require the resource from the specific worker */ + case class WorkerLocality(workerId: WorkerId) extends Locality + + /** Means no preference on worker */ + object NonLocality extends Locality + + /** Localities settings. Mapping from workerId to list of taskId */ + case class Localities(localities: Map[WorkerId, Array[TaskId]]) + + object Localities { + val pattern = "task_([0-9]+)_([0-9]+)".r + + // To avoid polluting the classpath, we do the JSON translation ourself instead of + // introducing JSON library dependencies directly. + def fromJson(json: String): Localities = { + val localities = ConfigFactory.parseString(json).getAnyRef("localities") + .asInstanceOf[java.util.Map[String, String]].asScala.map { pair => + val workerId: WorkerId = WorkerId.parse(pair._1) + val tasks = pair._2.split(",").map { task => + val pattern(processorId, taskIndex) = task + TaskId(processorId.toInt, taskIndex.toInt) + } + (workerId, tasks) + }.toMap + new Localities(localities) + } + + def toJson(localities: Localities): String = { + val map = localities.localities.toList.map { pair => + (WorkerId.render(pair._1), pair._2.map(task => + s"task_${task.processorId}_${task.index}").mkString(",")) + }.toMap.asJava + ConfigFactory.empty().withValue("localities", ConfigValueFactory.fromAnyRef(map)). + root.render(ConfigRenderOptions.concise()) + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala new file mode 100644 index 0000000..662418c --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import akka.actor._ +import akka.pattern.ask +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge +import org.apache.gearpump.streaming.AppMasterToExecutor._ +import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, UnRegisterTask} +import org.apache.gearpump.streaming._ +import org.apache.gearpump.streaming.appmaster.AppMaster.{AllocateResourceTimeOut, LookupTaskActorRef, TaskActorRef} +import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess} +import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange} +import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, StartExecutorsTimeOut, _} +import org.apache.gearpump.streaming.appmaster.TaskManager._ +import org.apache.gearpump.streaming.appmaster.TaskRegistry.{Accept, TaskLocation} +import org.apache.gearpump.streaming.executor.Executor.RestartTasks +import org.apache.gearpump.streaming.executor.ExecutorRestartPolicy +import org.apache.gearpump.streaming.task._ +import org.apache.gearpump.streaming.util.ActorPathUtil +import org.apache.gearpump.util.{Constants, LogUtil} +import org.slf4j.Logger + +import scala.concurrent.Future +import scala.concurrent.duration._ + +/** + * + * TaskManager track all tasks's status. + * + * It is state machine with three states: + * 1. applicationReady + * 2. recovery + * 3. dynamicDag + * + * When in state applicationReady: + * 1. When there is message-loss or JVM crash, transit to state recovery. + * 2. When user modify the DAG, transit to dynamicDag. + * + * When in state recovery: + * 1. When all tasks has been recovered, transit to applicationReady. + * + * When in state dynamicDag: + * 1. When dynamic dag transition is complete, transit to applicationReady. + * 2. When there is message loss or JVM crash, transit to state recovery. + */ +private[appmaster] class TaskManager( + appId: Int, + dagManager: ActorRef, + jarScheduler: JarScheduler, + executorManager: ActorRef, + clockService: ActorRef, + appMaster: ActorRef, + appName: String) + extends Actor { + + private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) + private val systemConfig = context.system.settings.config + + private val ids = new SessionIdFactory() + + private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries = 5, + withinTimeRange = 20.seconds) + + private implicit val timeout = Constants.FUTURE_TIMEOUT + private implicit val actorSystem = context.system + + import context.dispatcher + + dagManager ! WatchChange(watcher = self) + executorManager ! SetTaskManager(self) + + private def getStartClock: Future[TimeStamp] = { + (clockService ? GetStartClock).asInstanceOf[Future[StartClock]].map(_.clock) + } + + private var startClock: Future[TimeStamp] = getStartClock + + def receive: Receive = applicationReady(DagReadyState.empty) + + private def onClientQuery(taskRegistry: TaskRegistry): Receive = { + case clock: ClockEvent => + clockService forward clock + case GetTaskList => + sender ! TaskList(taskRegistry.getTaskExecutorMap) + case LookupTaskActorRef(taskId) => + val executorId = taskRegistry.getExecutorId(taskId) + val requestor = sender + executorId.map { executorId => + val taskPath = ActorPathUtil.taskActorPath(appMaster, executorId, taskId) + context.actorSelection(taskPath).resolveOne(3.seconds).map { taskActorRef => + requestor ! TaskActorRef(taskActorRef) + } + } + } + + /** + * State applicationReady + */ + def applicationReady(state: DagReadyState): Receive = { + executorManager ! state.taskRegistry.usedResource + dagManager ! NewDAGDeployed(state.dag.version) + dagManager ! GetLatestDAG + LOG.info(s"goto state ApplicationReady(dag = ${state.dag.version})...") + + val recoverRegistry = new TaskRegistry(expectedTasks = state.dag.tasks, + deadTasks = state.taskRegistry.deadTasks) + + val recoverState = new StartDagState(state.dag, recoverRegistry) + + val onError: Receive = { + case executorStopped@ExecutorStopped(executorId) => + if (state.taskRegistry.isTaskRegisteredForExecutor(executorId)) { + self ! executorStopped + context.become(recovery(recoverState)) + } + case MessageLoss(executorId, taskId, cause) => + if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) && + executorRestartPolicy.allowRestartExecutor(executorId)) { + context.become(recovery(recoverState)) + } else { + val errorMsg = s"Task $taskId fails too many times to recover" + appMaster ! FailedToRecover(errorMsg) + } + case replay: ReplayFromTimestampWindowTrailingEdge => + LOG.error(s"Received $replay") + context.become(recovery(recoverState)) + } + + val onNewDag: Receive = { + case LatestDAG(newDag) => + + if (newDag.version > state.dag.version) { + + executorManager ! BroadCast(StartDynamicDag(newDag.version)) + LOG.info("Broadcasting StartDynamicDag") + + val dagDiff = migrate(state.dag, newDag) + jarScheduler.setDag(newDag, startClock) + val resourceRequestsDetails = jarScheduler.getResourceRequestDetails() + resourceRequestsDetails.map { details => + details.foreach { detail => + if (detail.requests.length > 0 && detail.requests.exists(!_.resource.isEmpty)) { + executorManager ! StartExecutors(detail.requests, detail.jar) + } + } + } + + var modifiedTasks = List.empty[TaskId] + for (processorId <- dagDiff.modifiedProcessors ++ dagDiff.impactedUpstream) { + val executors = state.taskRegistry.processorExecutors(processorId) + executors.foreach { pair => + val (executorId, tasks) = pair + modifiedTasks ++= tasks + dagManager ! GetTaskLaunchData(newDag.version, processorId, + ChangeTasksOnExecutor(executorId, tasks)) + } + } + + val taskChangeRegistry = new TaskChangeRegistry(modifiedTasks) + + val deadTasks = state.taskRegistry.deadTasks + val registeredTasks = state.taskRegistry.registeredTasks + val dynamicTaskRegistry = new TaskRegistry(newDag.tasks, registeredTasks, deadTasks) + + val nextState = new StartDagState(newDag, dynamicTaskRegistry, taskChangeRegistry) + context.become(dynamicDag(nextState, recoverState)) + } + } + + val onUnRegisterTask: Receive = { + case unRegister: UnRegisterTask => + + LOG.info(s"Received $unRegister, stop task ${unRegister.taskId}") + sender ! StopTask(unRegister.taskId) + + val taskId = unRegister.taskId + val registry = state.taskRegistry + val deadTasks = registry.deadTasks + + val newRegistry = registry.copy(registeredTasks = registry.registeredTasks - taskId, + deadTasks = deadTasks + taskId) + + val newState = new DagReadyState(state.dag, newRegistry) + context.become(applicationReady(newState)) + } + + // Recovers to same version + onClientQuery(state.taskRegistry) orElse onError orElse onNewDag orElse + onUnRegisterTask orElse unHandled("applicationReady") + } + + /** + * State dynamicDag + */ + def dynamicDag(state: StartDagState, recoverState: StartDagState): Receive = { + LOG.info(s"DynamicDag transit to dag version: ${state.dag.version}...") + + val onMessageLoss: Receive = { + case executorStopped@ExecutorStopped(executorId) => + context.become(recovery(recoverState)) + case MessageLoss(executorId, taskId, cause) => + if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) && + executorRestartPolicy.allowRestartExecutor(executorId)) { + context.become(recovery(recoverState)) + } else { + val errorMsg = s"Task $taskId fails too many times to recover" + appMaster ! FailedToRecover(errorMsg) + } + } + + onMessageLoss orElse onClientQuery(state.taskRegistry) orElse + startDag(state, recoverState) orElse unHandled("dynamicDag") + } + + private def startDag(state: StartDagState, recoverState: StartDagState): Receive = { + case executor: ExecutorStarted => + import executor.{boundedJar, executorId, resource, workerId} + val taskIdsFuture = jarScheduler.scheduleTask(boundedJar.get, workerId, executorId, resource) + taskIdsFuture.foreach { taskIds => + LOG.info(s"Executor $executor has been started, " + + s"start to schedule tasks: ${taskIds.mkString(",")}") + taskIds.groupBy(_.processorId).foreach { pair => + val (processorId, tasks) = pair + dagManager ! GetTaskLaunchData(state.dag.version, processorId, + StartTasksOnExecutor(executor.executorId, tasks)) + } + } + + case StartExecutorsTimeOut => + appMaster ! AllocateResourceTimeOut + case TaskLaunchData(processorDescription, subscribers, command) => + command match { + case StartTasksOnExecutor(executorId, tasks) => + LOG.info(s"Start tasks on Executor($executorId), tasks: " + tasks) + val launchTasks = LaunchTasks(tasks, state.dag.version, processorDescription, subscribers) + executorManager ! UniCast(executorId, launchTasks) + tasks.foreach(executorRestartPolicy.addTaskToExecutor(executorId, _)) + case ChangeTasksOnExecutor(executorId, tasks) => + LOG.info("change Task on executor: " + executorId + ", tasks: " + tasks) + val changeTasks = ChangeTasks(tasks, state.dag.version, processorDescription.life, + subscribers) + executorManager ! UniCast(executorId, changeTasks) + case other => + LOG.error(s"severe error! we expect ExecutorStarted but get ${other.getClass.toString}") + } + case TasksLaunched => + // We will track all launched task by message RegisterTask + case TasksChanged(tasks) => + tasks.foreach(task => state.taskChangeRegistry.taskChanged(task)) + + if (allTasksReady(state)) { + broadcastLocations(state) + } + + case RegisterTask(taskId, executorId, host) => + val client = sender + val register = state.taskRegistry + val status = register.registerTask(taskId, TaskLocation(executorId, host)) + if (status == Accept) { + LOG.info(s"RegisterTask($taskId) TaskLocation: $host, Executor: $executorId") + val sessionId = ids.newSessionId + + startClock.foreach(clock => client ! TaskRegistered(taskId, sessionId, clock)) + if (allTasksReady(state)) { + broadcastLocations(state) + } + } else { + sender ! TaskRejected(taskId) + } + + case TaskChanged(taskId, dagVersion) => + state.taskChangeRegistry.taskChanged(taskId) + if (allTasksReady(state)) { + broadcastLocations(state) + } + case locationReceived: TaskLocationsReceived => + state.executorReadyRegistry.registerExecutor(locationReceived.executorId) + if (allTasksReady(state) && + state.executorReadyRegistry.allRegistered(state.taskRegistry.executors)) { + LOG.info("All executors are ready to start...") + clockService ! ChangeToNewDAG(state.dag) + } + case locationRejected: TaskLocationsRejected => + LOG.error(s"received $locationRejected, start to recover") + context.become(recovery(recoverState)) + + case ChangeToNewDAGSuccess(_) => + if (allTasksReady(state) && + state.executorReadyRegistry.allRegistered(state.taskRegistry.executors)) { + executorManager ! BroadCast(StartAllTasks(state.dag.version)) + context.become(applicationReady(new DagReadyState(state.dag, state.taskRegistry))) + } + } + + def onExecutorError: Receive = { + case ExecutorStopped(executorId) => + if (executorRestartPolicy.allowRestartExecutor(executorId)) { + jarScheduler.executorFailed(executorId).foreach { resourceRequestDetail => + if (resourceRequestDetail.isDefined) { + executorManager ! StartExecutors(resourceRequestDetail.get.requests, + resourceRequestDetail.get.jar) + } + } + } else { + val errorMsg = s"Executor restarted too many times to recover" + appMaster ! FailedToRecover(errorMsg) + } + } + + private def allTasksReady(state: StartDagState): Boolean = { + import state.{taskChangeRegistry, taskRegistry} + taskRegistry.isAllTasksRegistered && taskChangeRegistry.allTaskChanged + } + + private def broadcastLocations(state: StartDagState): Unit = { + LOG.info(s"All tasks have been launched; send Task locations to all executors") + val taskLocations = state.taskRegistry.getTaskLocations + executorManager ! BroadCast(TaskLocationsReady(taskLocations, state.dag.version)) + } + + /** + * State recovery + */ + def recovery(state: StartDagState): Receive = { + val recoverDagVersion = state.dag.version + executorManager ! BroadCast(RestartTasks(recoverDagVersion)) + + // Use new Start Clock so that we recover at timepoint we fails. + startClock = getStartClock + + jarScheduler.setDag(state.dag, startClock) + + LOG.info(s"goto state Recovery(recoverDag = $recoverDagVersion)...") + val ignoreClock: Receive = { + case clock: ClockEvent => + // Ignores clock events. + } + + if (state.dag.isEmpty) { + applicationReady(new DagReadyState(state.dag, state.taskRegistry)) + } else { + val registry = new TaskRegistry(expectedTasks = state.dag.tasks, + deadTasks = state.taskRegistry.deadTasks) + + val recoverState = new StartDagState(state.dag, registry) + ignoreClock orElse startDag(state, recoverState) orElse onExecutorError orElse + unHandled("recovery") + } + } + + private def unHandled(state: String): Receive = { + case other => + LOG.info(s"Received unknown message $other in state $state") + } +} + +private[appmaster] object TaskManager { + + /** + * When application is ready, then transit to DagReadyState + */ + class DagReadyState(val dag: DAG, val taskRegistry: TaskRegistry) + + object DagReadyState { + def empty: DagReadyState = { + new DagReadyState( + DAG.empty.copy(version = -1), + new TaskRegistry(List.empty[TaskId])) + } + } + + /** + * When application is booting up or doing recovery, it use StartDagState + */ + class StartDagState( + val dag: DAG, + val taskRegistry: TaskRegistry, + val taskChangeRegistry: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId]), + val executorReadyRegistry: ExecutorRegistry = new ExecutorRegistry) + + case object GetTaskList + + case class TaskList(tasks: Map[TaskId, ExecutorId]) + + case class FailedToRecover(errorMsg: String) + + /** + * Starts new Tasks on Executor executorId + */ + case class StartTasksOnExecutor(executorId: Int, tasks: List[TaskId]) + + /** + * Changes existing tasks on executor executorId + */ + case class ChangeTasksOnExecutor(executorId: Int, tasks: List[TaskId]) + + /** + * Tracks the registration of all new started executors. + */ + class ExecutorRegistry { + private var registeredExecutors = Set.empty[ExecutorId] + + def registerExecutor(executorId: ExecutorId): Unit = { + registeredExecutors += executorId + } + + def allRegistered(all: List[ExecutorId]): Boolean = { + all.forall(executor => registeredExecutors.contains(executor)) + } + } + + /** + * Tracks the registration of all changed tasks. + */ + class TaskChangeRegistry(targetTasks: List[TaskId]) { + private var registeredTasks = Set.empty[TaskId] + def taskChanged(taskId: TaskId): Unit = { + registeredTasks += taskId + } + def allTaskChanged: Boolean = { + targetTasks.forall(taskId => registeredTasks.contains(taskId)) + } + } + + object TaskChangeRegistry { + def empty: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId]) + } + + /** + * DAGDiff is used to track impacted processors when doing dynamic dag. + */ + case class DAGDiff( + addedProcessors: List[ProcessorId], + modifiedProcessors: List[ProcessorId], + impactedUpstream: List[ProcessorId]) + + /** + * Migrates from old DAG to new DAG, return DAGDiff + */ + def migrate(leftDAG: DAG, rightDAG: DAG): DAGDiff = { + val left = leftDAG.processors.keySet + val right = rightDAG.processors.keySet + + val added = right -- left + val join = right -- added + + val modified = join.filter { processorId => + leftDAG.processors(processorId) != rightDAG.processors(processorId) + } + + val upstream = (list: Set[ProcessorId]) => { + list.flatMap { processorId => + rightDAG.graph.incomingEdgesOf(processorId).map(_._1).toSet + } -- list + } + + val impactedUpstream = upstream(added ++ modified) + + // All upstream tasks are affected, and should be handled properly. + DAGDiff(added.toList, modified.toList, impactedUpstream.toList) + } + + /** + * Each new task will be assigned with a unique session Id. + */ + class SessionIdFactory { + private var nextSessionId = 1 + + /** + * Returns a new session Id for new task + */ + final def newSessionId: Int = { + val sessionId = nextSessionId + nextSessionId += 1 + sessionId + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskRegistry.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskRegistry.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskRegistry.scala new file mode 100644 index 0000000..b910d57 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskRegistry.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.streaming.appmaster.ExecutorManager.ExecutorResourceUsageSummary +import org.apache.gearpump.streaming.appmaster.TaskRegistry._ +import org.apache.gearpump.streaming.task.TaskId +import org.apache.gearpump.streaming.{ExecutorId, ProcessorId} +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.LogUtil +import org.slf4j.Logger + +/** + * Tracks the registration of all tasks, when the application is booting up. + */ +class TaskRegistry(val expectedTasks: List[TaskId], + var registeredTasks: Map[TaskId, TaskLocation] = Map.empty[TaskId, TaskLocation], + var deadTasks: Set[TaskId] = Set.empty[TaskId]) { + + private val LOG: Logger = LogUtil.getLogger(getClass) + + private val processors = expectedTasks.map(_.processorId).toSet + + /** + * When a task is booted, it need to call registerTask to register itself. + * If this task is valid, then accept it, otherwise reject it. + * + * @param taskId Task that register itself to TaskRegistry. + * @param location The host and port where this task is running on. NOTE: The host and port + * is NOT the same host and port of Akka remoting. Instead, it is host and port + * of custom netty layer, see [[org.apache.gearpump.transport.netty.Context]]. + */ + def registerTask(taskId: TaskId, location: TaskLocation): RegisterTaskStatus = { + val processorId = taskId.processorId + + if (processors.contains(processorId)) { + registeredTasks += taskId -> location + Accept + } else { + LOG.error(s" the task is not accepted for registration, taskId: ${taskId}") + Reject + } + } + + def copy(expectedTasks: List[TaskId] = this.expectedTasks, + registeredTasks: Map[TaskId, TaskLocation] = this.registeredTasks, + deadTasks: Set[TaskId] = this.deadTasks): TaskRegistry = { + new TaskRegistry(expectedTasks, registeredTasks, deadTasks) + } + + def getTaskLocations: TaskLocations = { + val taskLocations = registeredTasks.toList.groupBy(_._2.host).map { pair => + val (k, v) = pair + val taskIds = v.map(_._1) + (k, taskIds.toSet) + } + TaskLocations(taskLocations) + } + + def getTaskExecutorMap: Map[TaskId, ExecutorId] = { + getTaskLocations.locations.flatMap { pair => + val (hostPort, taskSet) = pair + taskSet.map { taskId => + (taskId, getExecutorId(taskId).getOrElse(-1)) + } + } + } + + /** Query the executor Id where the task is running on */ + def getExecutorId(taskId: TaskId): Option[Int] = { + registeredTasks.get(taskId).map(_.executorId) + } + + /** Gets list of allocated executor Ids */ + def executors: List[ExecutorId] = { + registeredTasks.toList.map(_._2.executorId) + } + + def isAllTasksRegistered: Boolean = { + val aliveTasks = (expectedTasks.toSet -- deadTasks) + aliveTasks.forall(task => registeredTasks.contains(task)) + } + + def isTaskRegisteredForExecutor(executorId: ExecutorId): Boolean = { + registeredTasks.exists(_._2.executorId == executorId) + } + + private def filterTasks(processorId: ProcessorId): List[TaskId] = { + registeredTasks.keys.toList.filter(_.processorId == processorId) + } + + /** List of executors that current processor taks are running on */ + def processorExecutors(processorId: ProcessorId): Map[ExecutorId, List[TaskId]] = { + val taskToExecutor = filterTasks(processorId).flatMap { taskId => + getExecutorId(taskId).map { executorId => + (taskId, executorId) + } + } + + val executorToTasks = taskToExecutor.groupBy(_._2).map { kv => + val (k, v) = kv + (k, v.map(_._1)) + } + executorToTasks + } + + /** Summary about how many resources are used for all running tasks */ + def usedResource: ExecutorResourceUsageSummary = { + val resourceMap = registeredTasks.foldLeft(Map.empty[ExecutorId, Resource]) { (map, task) => + val resource = map.getOrElse(task._2.executorId, Resource(0)) + Resource(1) + map + (task._2.executorId -> resource) + } + ExecutorResourceUsageSummary(resourceMap) + } +} + +object TaskRegistry { + sealed trait RegisterTaskStatus + case object Accept extends RegisterTaskStatus + case object Reject extends RegisterTaskStatus + + case class TaskLocation(executorId: Int, host: HostPort) + + case class TaskLocations(locations: Map[HostPort, Set[TaskId]]) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerImpl.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerImpl.scala new file mode 100644 index 0000000..df4490c --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerImpl.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.appmaster + +import com.typesafe.config.Config +import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.streaming.DAG +import org.apache.gearpump.streaming.appmaster.TaskLocator.{Locality, WorkerLocality} +import org.apache.gearpump.streaming.appmaster.TaskScheduler.{Location, TaskStatus} +import org.apache.gearpump.streaming.task.TaskId +import org.apache.gearpump.util.Constants + +/** + * Schedules tasks to run for new allocated resources. TaskScheduler only schedule tasks that + * share the same jar. For scheduling for multiple jars, see + * [[org.apache.gearpump.streaming.appmaster.JarScheduler]]. + */ +trait TaskScheduler { + + /** + * This notify the scheduler that the task DAG is created. + * @param dag task dag + */ + def setDAG(dag: DAG): Unit + + /** + * Get the resource requirements for all unscheduled tasks. + */ + def getResourceRequests(): Array[ResourceRequest] + + /** + * This notifies the scheduler that a resource slot on {workerId} and {executorId} is allocated + * , and expect a task to be scheduled in return. + * Task locality should be considered when deciding whether to offer a task on target {worker} + * and {executor} + * + * @param workerId which worker this resource is located. + * @param executorId which executorId this resource belongs to. + * @return a list of tasks + */ + def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId] + + /** + * This notifies the scheduler that {executorId} is failed, and expect a set of + * ResourceRequest for all failed tasks on that executor. + * + * @param executorId executor that failed + * @return resource requests of the failed executor + */ + def executorFailed(executorId: Int): Array[ResourceRequest] + + /** + * Queries the task list that already scheduled on the executor + * + * @param executorId executor to query + * @return a list of tasks + */ + def scheduledTasks(executorId: Int): List[TaskId] +} + +object TaskScheduler { + case class Location(workerId: WorkerId, executorId: Int) + + class TaskStatus(val taskId: TaskId, val preferLocality: Locality, var allocation: Location) +} + +class TaskSchedulerImpl(appId: Int, appName: String, config: Config) extends TaskScheduler { + private val executorNum = config.getInt(Constants.APPLICATION_EXECUTOR_NUMBER) + + private var tasks = List.empty[TaskStatus] + + // Finds the locality of the tasks + private val taskLocator = new TaskLocator(appName, config) + + override def setDAG(dag: DAG): Unit = { + val taskMap = tasks.map(_.taskId).zip(tasks).toMap + + tasks = dag.tasks.sortBy(_.index).map { taskId => + val locality = taskLocator.locateTask(taskId) + taskMap.getOrElse(taskId, new TaskStatus(taskId, locality, allocation = null)) + } + } + + def getResourceRequests(): Array[ResourceRequest] = { + fetchResourceRequests(fromOneWorker = false) + } + + import org.apache.gearpump.cluster.scheduler.Relaxation._ + private def fetchResourceRequests(fromOneWorker: Boolean = false): Array[ResourceRequest] = { + var workersResourceRequest = Map.empty[WorkerId, Resource] + + tasks.filter(_.allocation == null).foreach { task => + task.preferLocality match { + case WorkerLocality(workerId) => + val current = workersResourceRequest.getOrElse(workerId, Resource.empty) + workersResourceRequest += workerId -> (current + Resource(1)) + case _ => + val workerId = WorkerId.unspecified + val current = workersResourceRequest.getOrElse(workerId, Resource.empty) + workersResourceRequest += workerId -> (current + Resource(1)) + } + } + + workersResourceRequest.map { workerIdAndResource => + val (workerId, resource) = workerIdAndResource + if (workerId == WorkerId.unspecified) { + ResourceRequest(resource, workerId = WorkerId.unspecified, executorNum = executorNum) + } else { + ResourceRequest(resource, workerId, relaxation = SPECIFICWORKER) + } + }.toArray + } + + override def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId] = { + var scheduledTasks = List.empty[TaskId] + val location = Location(workerId, executorId) + // Schedules tasks for specific worker + scheduledTasks ++= scheduleTasksForLocality(resource, location, + (locality) => locality == WorkerLocality(workerId)) + + // Schedules tasks without specific location preference + scheduledTasks ++= scheduleTasksForLocality(resource - Resource(scheduledTasks.length), + location, (locality) => true) + scheduledTasks + } + + private def scheduleTasksForLocality( + resource: Resource, resourceLocation: Location, matcher: (Locality) => Boolean) + : List[TaskId] = { + var scheduledTasks = List.empty[TaskId] + var index = 0 + var remain = resource.slots + while (index < tasks.length && remain > 0) { + val taskStatus = tasks(index) + if (taskStatus.allocation == null && matcher(taskStatus.preferLocality)) { + taskStatus.allocation = resourceLocation + scheduledTasks +:= taskStatus.taskId + remain -= 1 + } + index += 1 + } + scheduledTasks + } + + override def executorFailed(executorId: Int): Array[ResourceRequest] = { + val failedTasks = tasks.filter { status => + status.allocation != null && status.allocation.executorId == executorId + } + // Cleans the location of failed tasks + failedTasks.foreach(_.allocation = null) + + Array(ResourceRequest(Resource(failedTasks.length), + workerId = WorkerId.unspecified, relaxation = ONEWORKER)) + } + + override def scheduledTasks(executorId: Int): List[TaskId] = { + tasks.filter { status => + status.allocation != null && status.allocation.executorId == executorId + }.map(_.taskId) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala new file mode 100644 index 0000000..5ca92dd --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.op._ +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.util.Graph +import org.slf4j.{Logger, LoggerFactory} + +import scala.language.implicitConversions + +class Stream[T]( + private val graph: Graph[Op, OpEdge], private val thisNode: Op, + private val edge: Option[OpEdge] = None) { + + /** + * converts a value[T] to a list of value[R] + * + * @param fun FlatMap function + * @param description The description message for this operation + * @return A new stream with type [R] + */ + def flatMap[R](fun: T => TraversableOnce[R], description: String = null): Stream[R] = { + val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap")) + graph.addVertex(flatMapOp) + graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp) + new Stream[R](graph, flatMapOp) + } + + /** + * Maps message of type T message of type R + * + * @param fun Function + * @return A new stream with type [R] + */ + def map[R](fun: T => R, description: String = null): Stream[R] = { + this.flatMap({ data => + Option(fun(data)) + }, Option(description).getOrElse("map")) + } + + /** + * Keeps records when fun(T) == true + * + * @param fun the filter + * @return a new stream after filter + */ + def filter(fun: T => Boolean, description: String = null): Stream[T] = { + this.flatMap({ data => + if (fun(data)) Option(data) else None + }, Option(description).getOrElse("filter")) + } + + /** + * Reduces operations. + * + * @param fun reduction function + * @param description description message for this operator + * @return a new stream after reduction + */ + def reduce(fun: (T, T) => T, description: String = null): Stream[T] = { + val reduceOp = ReduceOp(fun, Option(description).getOrElse("reduce")) + graph.addVertex(reduceOp) + graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp) + new Stream(graph, reduceOp) + } + + /** + * Log to task log file + */ + def log(): Unit = { + this.map(msg => LoggerFactory.getLogger("dsl").info(msg.toString), "log") + } + + /** + * Merges data from two stream into one + * + * @param other the other stream + * @return the merged stream + */ + def merge(other: Stream[T], description: String = null): Stream[T] = { + val mergeOp = MergeOp(Option(description).getOrElse("merge")) + graph.addVertex(mergeOp) + graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp) + graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp) + new Stream[T](graph, mergeOp) + } + + /** + * Group by function (T => Group) + * + * For example, we have T type, People(name: String, gender: String, age: Int) + * groupBy[People](_.gender) will group the people by gender. + * + * You can append other combinators after groupBy + * + * For example, + * {{{ + * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..) + * }}} + * + * @param fun Group by function + * @param parallelism Parallelism level + * @param description The description + * @return the grouped stream + */ + def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null) + : Stream[T] = { + val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy")) + graph.addVertex(groupOp) + graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) + new Stream[T](graph, groupOp) + } + + /** + * Connects with a low level Processor(TaskDescription) + * + * @param processor a user defined processor + * @param parallelism parallelism level + * @return new stream after processing with type [R] + */ + def process[R]( + processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty, + description: String = null): Stream[R] = { + val processorOp = ProcessorOp(processor, parallelism, conf, + Option(description).getOrElse("process")) + graph.addVertex(processorOp) + graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp) + new Stream[R](graph, processorOp, Some(Shuffle)) + } +} + +class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { + /** + * GroupBy key + * + * Applies to Stream[Tuple2[K,V]] + * + * @param parallelism the parallelism for this operation + * @return the new KV stream + */ + def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = { + stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey") + } + + /** + * Sum the value of the tuples + * + * Apply to Stream[Tuple2[K,V]], V must be of type Number + * + * For input (key, value1), (key, value2), will generate (key, value1 + value2) + * @param numeric the numeric operations + * @return the sum stream + */ + def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = { + stream.reduce(Stream.sumByValue[K, V](numeric), "sum") + } +} + +object Stream { + + def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = { + new Stream[T](graph, node, edge) + } + + def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 + + def sumByValue[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] + = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) + + implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = { + new KVStream(stream) + } + + implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable { + def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, description: String) + : Stream[T] = { + implicit val sink = DataSinkOp[T](dataSink, parallism, conf, + Some(description).getOrElse("traversable")) + stream.graph.addVertex(sink) + stream.graph.addEdge(stream.thisNode, Shuffle, sink) + new Stream[T](stream.graph, sink) + } + + def sink[T]( + sink: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty, + description: String = null): Stream[T] = { + val sinkOp = ProcessorOp(sink, parallism, conf, Option(description).getOrElse("source")) + stream.graph.addVertex(sinkOp) + stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp) + new Stream[T](stream.graph, sinkOp) + } + } +} + +class LoggerSink[T] extends DataSink { + var logger: Logger = null + + private var context: TaskContext = null + + override def open(context: TaskContext): Unit = { + this.logger = context.logger + } + + override def write(message: Message): Unit = { + logger.info("logging message " + message.msg) + } + + override def close(): Unit = Unit +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala new file mode 100644 index 0000000..5027500 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.StreamApplication +import org.apache.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp} +import org.apache.gearpump.streaming.dsl.plan.Planner +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.util.Graph +import org.apache.gearpump.{Message, TimeStamp} + +import scala.language.implicitConversions + +/** + * Example: + * {{{ + * val data = "This is a good start, bingo!! bingo!!" + * app.fromCollection(data.lines.toList). + * // word => (word, count) + * flatMap(line => line.split("[\\s]+")).map((_, 1)). + * // (word, count1), (word, count2) => (word, count1 + count2) + * groupBy(kv => kv._1).reduce(sum(_, _)) + * + * val appId = context.submit(app) + * context.close() + * }}} + * + * @param name name of app + */ +class StreamApp( + val name: String, system: ActorSystem, userConfig: UserConfig, val graph: Graph[Op, OpEdge]) { + + def this(name: String, system: ActorSystem, userConfig: UserConfig) = { + this(name, system, userConfig, Graph.empty[Op, OpEdge]) + } + + def plan(): StreamApplication = { + implicit val actorSystem = system + val planner = new Planner + val dag = planner.plan(graph) + StreamApplication(name, dag, userConfig) + } +} + +object StreamApp { + def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty) + : StreamApp = { + new StreamApp(name, context.system, userConfig) + } + + implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = { + streamApp.plan + } + + implicit class Source(app: StreamApp) extends java.io.Serializable { + + def source[T](dataSource: DataSource, parallism: Int): Stream[T] = { + source(dataSource, parallism, UserConfig.empty) + } + + def source[T](dataSource: DataSource, parallism: Int, description: String): Stream[T] = { + source(dataSource, parallism, UserConfig.empty, description) + } + + def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig): Stream[T] = { + source(dataSource, parallism, conf, description = null) + } + + def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig, description: String) + : Stream[T] = { + implicit val sourceOp = DataSourceOp(dataSource, parallism, conf, description) + app.graph.addVertex(sourceOp) + new Stream[T](app.graph, sourceOp) + } + def source[T](seq: Seq[T], parallism: Int, description: String): Stream[T] = { + this.source(new CollectionDataSource[T](seq), parallism, UserConfig.empty, description) + } + + def source[T](source: Class[_ <: Task], parallism: Int, conf: UserConfig, description: String) + : Stream[T] = { + val sourceOp = ProcessorOp(source, parallism, conf, Option(description).getOrElse("source")) + app.graph.addVertex(sourceOp) + new Stream[T](app.graph, sourceOp) + } + } +} + +/** A test message source which generated message sequence repeatedly. */ +class CollectionDataSource[T](seq: Seq[T]) extends DataSource { + private lazy val iterator: Iterator[T] = seq.iterator + + override def read(): Message = { + if (iterator.hasNext) { + Message(iterator.next()) + } else { + null + } + } + + override def close(): Unit = {} + + override def open(context: TaskContext, startTime: TimeStamp): Unit = {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala new file mode 100644 index 0000000..6eff20c --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.javaapi + +import scala.collection.JavaConverters._ + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.Stream +import org.apache.gearpump.streaming.javaapi.dsl.functions._ +import org.apache.gearpump.streaming.task.Task + +/** + * Java DSL + */ +class JavaStream[T](val stream: Stream[T]) { + + /** FlatMap on stream */ + def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = { + new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description)) + } + + /** Map on stream */ + def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = { + new JavaStream[R](stream.map({ t: T => fn(t) }, description)) + } + + /** Only keep the messages that FilterFunction returns true. */ + def filter(fn: FilterFunction[T], description: String): JavaStream[T] = { + new JavaStream[T](stream.filter({ t: T => fn(t) }, description)) + } + + /** Does aggregation on the stream */ + def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = { + new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, description)) + } + + def log(): Unit = { + stream.log() + } + + /** Merges streams of same type together */ + def merge(other: JavaStream[T], description: String): JavaStream[T] = { + new JavaStream[T](stream.merge(other.stream, description)) + } + + /** + * Group by a stream and turns it to a list of sub-streams. Operations chained after + * groupBy applies to sub-streams. + */ + def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String) + : JavaStream[T] = { + new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description)) + } + + /** Add a low level Processor to process messages */ + def process[R]( + processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String) + : JavaStream[R] = { + new JavaStream[R](stream.process(processor, parallelism, conf, description)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala new file mode 100644 index 0000000..0d841be --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.javaapi + +import java.util.Collection +import scala.collection.JavaConverters._ + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.dsl.{CollectionDataSource, StreamApp} +import org.apache.gearpump.streaming.source.DataSource + +class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) { + + private val streamApp = StreamApp(name, context, userConfig) + + def source[T](collection: Collection[T], parallelism: Int, + conf: UserConfig, description: String): JavaStream[T] = { + val dataSource = new CollectionDataSource(collection.asScala.toSeq) + source(dataSource, parallelism, conf, description) + } + + def source[T](dataSource: DataSource, parallelism: Int, + conf: UserConfig, description: String): JavaStream[T] = { + new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description)) + } + + def run(): Unit = { + context.submit(streamApp) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala new file mode 100644 index 0000000..49d9dec --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.op + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.Task + +/** + * Operators for the DSL + */ +sealed trait Op { + def description: String + def conf: UserConfig +} + +/** + * When translated to running DAG, SlaveOP can be attach to MasterOP or other SlaveOP + * "Attach" means running in same Actor. + */ +trait SlaveOp[T] extends Op + +case class FlatMapOp[T, R]( + fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = UserConfig.empty) + extends SlaveOp[T] + +case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig = UserConfig.empty) + extends SlaveOp[T] + +trait MasterOp extends Op + +trait ParameterizedOp[T] extends MasterOp + +case class MergeOp(description: String, override val conf: UserConfig = UserConfig.empty) + extends MasterOp + +case class GroupByOp[T, R]( + fun: T => R, parallelism: Int, description: String, + override val conf: UserConfig = UserConfig.empty) + extends ParameterizedOp[T] + +case class ProcessorOp[T <: Task]( + processor: Class[T], parallelism: Int, conf: UserConfig, description: String) + extends ParameterizedOp[T] + +case class DataSourceOp[T]( + dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String) + extends ParameterizedOp[T] + +case class DataSinkOp[T]( + dataSink: DataSink, parallelism: Int, conf: UserConfig, description: String) + extends ParameterizedOp[T] + +/** + * Contains operators which can be chained to single one. + * + * For example, flatmap().map().reduce() can be chained to single operator as + * no data shuffling is required. + * @param ops list of operations + */ +case class OpChain(ops: List[Op]) extends Op { + def head: Op = ops.head + def last: Op = ops.last + + def description: String = null + + override def conf: UserConfig = { + // The head's conf has priority + ops.reverse.foldLeft(UserConfig.empty) { (conf, op) => + conf.withConfig(op.conf) + } + } +} + +trait OpEdge + +/** + * The upstream OP and downstream OP doesn't require network data shuffle. + * + * For example, map, flatmap operation doesn't require network shuffle, we can use Direct + * to represent the relation with upstream operators. + */ +case object Direct extends OpEdge + +/** + * The upstream OP and downstream OP DOES require network data shuffle. + * + * For example, map, flatmap operation doesn't require network shuffle, we can use Direct + * to represent the relation with upstream operators. + */ +case object Shuffle extends OpEdge + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala new file mode 100644 index 0000000..b2e2932 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.partitioner + +import org.apache.gearpump.Message +import org.apache.gearpump.partitioner.UnicastPartitioner + +/** + * Partition messages by applying group by function first. + * + * For example: + * {{{ + * case class People(name: String, gender: String) + * + * object Test{ + * + * val groupBy: (People => String) = people => people.gender + * val partitioner = GroupByPartitioner(groupBy) + * } + * }}} + * + * @param groupBy First apply message with groupBy function, then pick the hashCode of the output + * to do the partitioning. You must define hashCode() for output type of groupBy function. + */ +class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends UnicastPartitioner { + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode() + (hashCode & Integer.MAX_VALUE) % partitionNum + } +} \ No newline at end of file
