http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala new file mode 100644 index 0000000..56d31db --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.plan + +import scala.collection.TraversableOnce + +import akka.actor.ActorSystem +import org.slf4j.Logger + +import org.apache.gearpump._ +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.Processor.DefaultProcessor +import org.apache.gearpump.streaming.dsl.op._ +import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.util.LogUtil + +/** + * Translates a OP to a TaskDescription + */ +class OpTranslator extends java.io.Serializable { + val LOG: Logger = LogUtil.getLogger(getClass) + + def translate(ops: OpChain)(implicit system: ActorSystem): Processor[_ <: Task] = { + + val baseConfig = ops.conf + + ops.ops.head match { + case op: MasterOp => + val tail = ops.ops.tail + val func = toFunction(tail) + val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func) + + op match { + case DataSourceOp(dataSource, parallism, conf, description) => + Processor[SourceTask[Object, Object]](parallism, + description = description + "." + func.description, + userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)) + case groupby@GroupByOp(_, parallism, description, _) => + Processor[GroupByTask[Object, Object, Object]](parallism, + description = description + "." + func.description, + userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupby)) + case merge: MergeOp => + Processor[TransformTask[Object, Object]](1, + description = op.description + "." + func.description, + userConfig) + case ProcessorOp(processor, parallism, conf, description) => + DefaultProcessor(parallism, + description = description + "." + func.description, + userConfig, processor) + case DataSinkOp(dataSink, parallelism, conf, description) => + Processor[SinkTask[Object]](parallelism, + description = description + func.description, + userConfig.withValue(GEARPUMP_STREAMING_SINK, dataSink)) + } + case op: SlaveOp[_] => + val func = toFunction(ops.ops) + val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func) + + Processor[TransformTask[Object, Object]](1, + description = func.description, + taskConf = userConfig) + case chain: OpChain => + throw new RuntimeException("Not supposed to be called!") + } + } + + private def toFunction(ops: List[Op]): SingleInputFunction[Object, Object] = { + val func: SingleInputFunction[Object, Object] = new DummyInputFunction[Object]() + val totalFunction = ops.foldLeft(func) { (fun, op) => + + val opFunction = op match { + case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] => + new FlatMapFunction(flatmap.fun, flatmap.description) + case reduce: ReduceOp[Object @unchecked] => + new ReduceFunction(reduce.fun, reduce.description) + case _ => + throw new RuntimeException("Not supposed to be called!") + } + fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]]) + } + totalFunction.asInstanceOf[SingleInputFunction[Object, Object]] + } +} + +object OpTranslator { + + trait SingleInputFunction[IN, OUT] extends Serializable { + def process(value: IN): TraversableOnce[OUT] + def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { + new AndThen(this, other) + } + + def description: String + } + + class DummyInputFunction[T] extends SingleInputFunction[T, T] { + override def andThen[OUTER](other: SingleInputFunction[T, OUTER]) + : SingleInputFunction[T, OUTER] = { + other + } + + // Should never be called + override def process(value: T): TraversableOnce[T] = None + + override def description: String = "" + } + + class AndThen[IN, MIDDLE, OUT]( + first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) + extends SingleInputFunction[IN, OUT] { + + override def process(value: IN): TraversableOnce[OUT] = { + first.process(value).flatMap(second.process(_)) + } + + override def description: String = { + Option(first.description).flatMap { description => + Option(second.description).map(description + "." + _) + }.getOrElse(null) + } + } + + class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], descriptionMessage: String) + extends SingleInputFunction[IN, OUT] { + + override def process(value: IN): TraversableOnce[OUT] = { + fun(value) + } + + override def description: String = { + this.descriptionMessage + } + } + + class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String) + extends SingleInputFunction[T, T] { + + private var state: Any = null + + override def process(value: T): TraversableOnce[T] = { + if (state == null) { + state = value + } else { + state = fun(state.asInstanceOf[T], value) + } + Some(state.asInstanceOf[T]) + } + + override def description: String = descriptionMessage + } + + class GroupByTask[IN, GROUP, OUT]( + groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig) + extends Task(taskContext, userConf) { + + def this(taskContext: TaskContext, userConf: UserConfig) = { + this(userConf.getValue[GroupByOp[IN, GROUP]]( + GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun, + taskContext, userConf) + } + + private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]] + + override def onStart(startTime: StartTime): Unit = { + } + + override def onNext(msg: Message): Unit = { + val time = msg.timestamp + + val group = groupBy(msg.msg.asInstanceOf[IN]) + if (!groups.contains(group)) { + val operator = + userConf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get + groups += group -> operator + } + + val operator = groups(group) + + operator.process(msg.msg.asInstanceOf[IN]).foreach { msg => + taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) + } + } + } + + class SourceTask[T, OUT]( + source: DataSource, operator: Option[SingleInputFunction[T, OUT]], taskContext: TaskContext, + userConf: UserConfig) + extends Task(taskContext, userConf) { + + def this(taskContext: TaskContext, userConf: UserConfig) = { + this( + userConf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(taskContext.system).get, + userConf.getValue[SingleInputFunction[T, OUT]](GEARPUMP_STREAMING_OPERATOR)( + taskContext.system), + taskContext, userConf) + } + + override def onStart(startTime: StartTime): Unit = { + source.open(taskContext, startTime.startTime) + self ! Message("start", System.currentTimeMillis()) + } + + override def onNext(msg: Message): Unit = { + val time = System.currentTimeMillis() + Option(source.read()).foreach { msg => + operator match { + case Some(operator) => + operator match { + case bad: DummyInputFunction[T] => + taskContext.output(msg) + case _ => + operator.process(msg.msg.asInstanceOf[T]).foreach(msg => { + taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) + }) + } + case None => + taskContext.output(msg) + } + } + + self ! Message("next", System.currentTimeMillis()) + } + + override def onStop(): Unit = { + source.close() + } + } + + class TransformTask[IN, OUT]( + operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, + userConf: UserConfig) extends Task(taskContext, userConf) { + + def this(taskContext: TaskContext, userConf: UserConfig) = { + this(userConf.getValue[SingleInputFunction[IN, OUT]]( + GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) + } + + override def onStart(startTime: StartTime): Unit = { + } + + override def onNext(msg: Message): Unit = { + val time = msg.timestamp + + operator match { + case Some(operator) => + operator.process(msg.msg.asInstanceOf[IN]).foreach { msg => + taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) + } + case None => + taskContext.output(new Message(msg.msg, time)) + } + } + } + + class SinkTask[T](dataSink: DataSink, taskContext: TaskContext, userConf: UserConfig) + extends Task(taskContext, userConf) { + + def this(taskContext: TaskContext, userConf: UserConfig) = { + this(userConf.getValue[DataSink](GEARPUMP_STREAMING_SINK)(taskContext.system).get, + taskContext, userConf) + } + + override def onStart(startTime: StartTime): Unit = { + dataSink.open(taskContext) + } + + override def onNext(msg: Message): Unit = { + dataSink.write(msg) + } + + override def onStop(): Unit = { + dataSink.close() + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala new file mode 100644 index 0000000..3af5e97 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.plan + +import akka.actor.ActorSystem + +import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.dsl.op._ +import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner +import org.apache.gearpump.streaming.task.Task +import org.apache.gearpump.util.Graph + +class Planner { + + /* + * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low + * level Graph API. + */ + def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem) + : Graph[Processor[_ <: Task], _ <: Partitioner] = { + + val opTranslator = new OpTranslator() + + val newDag = optimize(dag) + newDag.mapEdge { (node1, edge, node2) => + edge match { + case Shuffle => + node2.head match { + case groupBy: GroupByOp[Any @unchecked, Any @unchecked] => + new GroupByPartitioner(groupBy.fun) + case _ => new HashPartitioner + } + case Direct => + new CoLocationPartitioner + } + }.mapVertex { opChain => + opTranslator.translate(opChain) + } + } + + private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = { + val newGraph = dag.mapVertex(op => OpChain(List(op))) + + val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse + for (node <- nodes) { + val outGoingEdges = newGraph.outgoingEdgesOf(node) + for (edge <- outGoingEdges) { + merge(newGraph, edge._1, edge._3) + } + } + newGraph + } + + private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: OpChain) + : Graph[OpChain, OpEdge] = { + if (dag.outDegreeOf(node1) == 1 && + dag.inDegreeOf(node2) == 1 && + // For processor node, we don't allow it to merge with downstream operators + !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) { + val (_, edge, _) = dag.outgoingEdgesOf(node1)(0) + if (edge == Direct) { + val opList = OpChain(node1.ops ++ node2.ops) + dag.addVertex(opList) + for (incomingEdge <- dag.incomingEdgesOf(node1)) { + dag.addEdge(incomingEdge._1, incomingEdge._2, opList) + } + + for (outgoingEdge <- dag.outgoingEdgesOf(node2)) { + dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3) + } + + // Remove the old vertex + dag.removeVertex(node1) + dag.removeVertex(node2) + } + } + dag + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala new file mode 100644 index 0000000..0eeb0eb --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.executor + +import java.lang.management.ManagementFactory +import scala.concurrent.duration._ + +import akka.actor.SupervisorStrategy.Resume +import akka.actor._ +import com.typesafe.config.Config +import org.apache.commons.lang.exception.ExceptionUtils +import org.slf4j.Logger + +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.cluster.{ClusterConfig, ExecutorContext, UserConfig} +import org.apache.gearpump.metrics.Metrics.ReportMetrics +import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} +import org.apache.gearpump.serializer.SerializationFramework +import org.apache.gearpump.streaming.AppMasterToExecutor.{MsgLostException, TasksChanged, TasksLaunched, _} +import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask} +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.executor.Executor._ +import org.apache.gearpump.streaming.executor.TaskLauncher.TaskArgument +import org.apache.gearpump.streaming.task.{Subscriber, TaskId} +import org.apache.gearpump.transport.{Express, HostPort} +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, TimeOutScheduler} + +/** + * Executor is child of AppMaster. + * It usually represents a JVM process. It is a container for all tasks. + */ + +// TODO: What if Executor stuck in state DynamicDag and cannot get out??? +// For example, due to some message loss when there is network glitch. +// Executor will hang there for ever??? +// +class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher: ITaskLauncher) + extends Actor with TimeOutScheduler{ + + def this(executorContext: ExecutorContext, userConf: UserConfig) = { + this(executorContext, userConf, TaskLauncher(executorContext, userConf)) + } + + import context.dispatcher + import executorContext.{appId, appMaster, executorId, resource, worker} + + private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId) + + private implicit val timeOut = FUTURE_TIMEOUT + private val address = ActorUtil.getFullPath(context.system, self.path) + private val systemConfig = context.system.settings.config + private val serializerPool = getSerializerPool() + private val taskDispatcher = systemConfig.getString(Constants.GEARPUMP_TASK_DISPATCHER) + + private var state = State.ACTIVE + private var transitionStart = 0L + // States transition start, in unix time + private var transitionEnd = 0L + // States transition end, in unix time + private val transitWarningThreshold = 5000 // ms, + + // Starts health check Ticks + self ! HealthCheck + + LOG.info(s"Executor $executorId has been started, start to register itself...") + LOG.info(s"Executor actor path: ${ActorUtil.getFullPath(context.system, self.path)}") + + appMaster ! RegisterExecutor(self, executorId, resource, worker) + context.watch(appMaster) + + private var tasks = Map.empty[TaskId, ActorRef] + private val taskArgumentStore = new TaskArgumentStore() + + val express = Express(context.system) + + val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) + + if (metricsEnabled) { + // Registers jvm metrics + Metrics(context.system).register(new JvmMetricsSet(s"app$appId.executor$executorId")) + + val metricsReportService = context.actorOf(Props(new MetricsReporterService( + Metrics(context.system)))) + appMaster.tell(ReportMetrics, metricsReportService) + } + + private val NOT_INITIALIZED = -1 + def receive: Receive = applicationReady(dagVersion = NOT_INITIALIZED) + + private def getTaskId(actorRef: ActorRef): Option[TaskId] = { + tasks.find(_._2 == actorRef).map(_._1) + } + + override val supervisorStrategy = + OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) { + case _: MsgLostException => + val taskId = getTaskId(sender) + val cause = s"We got MessageLossException from task ${getTaskId(sender)}, " + + s"replaying application..." + LOG.error(cause) + taskId.foreach(appMaster ! MessageLoss(executorId, _, cause)) + Resume + case ex: Throwable => + val taskId = getTaskId(sender) + val errorMsg = s"We got ${ex.getClass.getName} from $taskId, we will treat it as" + + s" MessageLoss, so that the system will replay all lost message" + LOG.error(errorMsg, ex) + val detailErrorMsg = errorMsg + "\n" + ExceptionUtils.getStackTrace(ex) + taskId.foreach(appMaster ! MessageLoss(executorId, _, detailErrorMsg)) + Resume + } + + private def launchTask(taskId: TaskId, argument: TaskArgument): ActorRef = { + launcher.launch(List(taskId), argument, context, serializerPool, taskDispatcher).values.head + } + + private def assertVersion(expectVersion: Int, version: Int, clue: Any): Unit = { + if (expectVersion != version) { + val errorMessage = s"Version mismatch: we expect dag version $expectVersion, " + + s"but get $version; clue: $clue" + LOG.error(errorMessage) + throw new DagVersionMismatchException(errorMessage) + } + } + + def dynamicDagPhase1( + dagVersion: Int, launched: List[TaskId], changed: List[ChangeTask], registered: List[TaskId]) + : Receive = { + state = State.DYNAMIC_DAG_PHASE1 + box({ + case launch@LaunchTasks(taskIds, version, processorDescription, + subscribers: List[Subscriber]) => { + assertVersion(dagVersion, version, clue = launch) + + LOG.info(s"Launching Task $taskIds for app: $appId") + val taskArgument = TaskArgument(version, processorDescription, subscribers) + taskIds.foreach(taskArgumentStore.add(_, taskArgument)) + val newAdded = launcher.launch(taskIds, taskArgument, context, serializerPool, + taskDispatcher) + newAdded.foreach { newAddedTask => + context.watch(newAddedTask._2) + } + tasks ++= newAdded + sender ! TasksLaunched + context.become(dynamicDagPhase1(version, launched ++ taskIds, changed, registered)) + } + case change@ChangeTasks(taskIds, version, life, subscribers) => + assertVersion(dagVersion, version, clue = change) + + LOG.info(s"Change Tasks $taskIds for app: $appId, verion: $life, $dagVersion, $subscribers") + + val newChangedTasks = taskIds.map { taskId => + for (taskArgument <- taskArgumentStore.get(dagVersion, taskId)) { + val processorDescription = taskArgument.processorDescription.copy(life = life) + taskArgumentStore.add(taskId, TaskArgument(dagVersion, processorDescription, + subscribers)) + } + ChangeTask(taskId, dagVersion, life, subscribers) + } + sender ! TasksChanged(taskIds) + context.become(dynamicDagPhase1(dagVersion, launched, changed ++ newChangedTasks, + registered)) + + case locations@TaskLocationsReady(taskLocations, version) => + LOG.info(s"TaskLocations Ready...") + assertVersion(dagVersion, version, clue = locations) + + // Check whether all tasks has been registered. + if ((launched.toSet -- registered.toSet).isEmpty) { + // Confirm all tasks has been registered. + val result = taskLocations.locations.filter { + location => !location._1.equals(express.localHost) + }.flatMap { kv => + val (host, taskIdList) = kv + taskIdList.map(taskId => (TaskId.toLong(taskId), host)) + } + + val replyTo = sender + express.startClients(taskLocations.locations.keySet).foreach { _ => + express.remoteAddressMap.send(result) + express.remoteAddressMap.future().foreach { _ => + LOG.info(s"sending TaskLocationsReceived back to appmaster") + replyTo ! TaskLocationsReceived(version, executorId) + } + } + context.become(dynamicDagPhase2(dagVersion, launched, changed)) + } else { + LOG.error("Inconsistency between AppMaser and Executor! AppMaster thinks DynamicDag " + + "transition is ready, while Executor have not get all tasks registered, " + + "that task will not be functional...") + + // Reject TaskLocations... + val missedTasks = (launched.toSet -- registered.toSet).toList + val errorMsg = "We have not received TaskRegistered for following tasks: " + + missedTasks.mkString(", ") + LOG.error(errorMsg) + sender ! TaskLocationsRejected(dagVersion, executorId, errorMsg, null) + // Stays with current status... + } + + case confirm: TaskRegistered => + tasks.get(confirm.taskId).foreach { + case actorRef: ActorRef => + tasks += confirm.taskId -> actorRef + actorRef forward confirm + } + context.become(dynamicDagPhase1(dagVersion, launched, changed, + registered :+ confirm.taskId)) + + case rejected: TaskRejected => + // Means this task shoud not exists... + tasks.get(rejected.taskId).foreach(_ ! PoisonPill) + tasks -= rejected.taskId + LOG.error(s"Task ${rejected.taskId} is rejected by AppMaster, shutting down it...") + + case register: RegisterTask => + appMaster ! register + }) + } + + def dynamicDagPhase2(dagVersion: Int, launched: List[TaskId], changed: List[ChangeTask]) + : Receive = { + LOG.info("Transit to dynamic Dag Phase2") + state = State.DYNAMIC_DAG_PHASE2 + box { + case startAll@StartAllTasks(version) => + LOG.info(s"Start All Tasks...") + assertVersion(dagVersion, version, clue = startAll) + + launched.foreach(taskId => tasks.get(taskId).foreach(_ ! StartTask(taskId))) + changed.foreach(changeTask => tasks.get(changeTask.taskId).foreach(_ ! changeTask)) + + taskArgumentStore.removeNewerVersion(dagVersion) + taskArgumentStore.removeObsoleteVersion + context.become(applicationReady(dagVersion)) + } + } + + def applicationReady(dagVersion: Int): Receive = { + state = State.ACTIVE + transitionEnd = System.currentTimeMillis() + + if (dagVersion != NOT_INITIALIZED) { + LOG.info("Transit to state Application Ready. This transition takes " + + (transitionEnd - transitionStart) + " milliseconds") + } + box { + case start: StartDynamicDag => + LOG.info("received StartDynamicDag") + if (start.dagVersion > dagVersion) { + transitionStart = System.currentTimeMillis() + LOG.info(s"received $start, Executor transit to dag version: ${start.dagVersion} from " + + s"current version $dagVersion") + context.become(dynamicDagPhase1(start.dagVersion, List.empty[TaskId], + List.empty[ChangeTask], List.empty[TaskId])) + } + case launch: LaunchTasks => + if (launch.dagVersion > dagVersion) { + transitionStart = System.currentTimeMillis() + LOG.info(s"received $launch, Executor transit to dag " + + s"version: ${launch.dagVersion} from current version $dagVersion") + context.become(dynamicDagPhase1(launch.dagVersion, List.empty[TaskId], + List.empty[ChangeTask], List.empty[TaskId])) + self forward launch + } + + case change: ChangeTasks => + if (change.dagVersion > dagVersion) { + transitionStart = System.currentTimeMillis() + LOG.info(s"received $change, Executor transit to dag version: ${change.dagVersion} from" + + s" current version $dagVersion") + context.become(dynamicDagPhase1(change.dagVersion, List.empty[TaskId], + List.empty[ChangeTask], List.empty[TaskId])) + self forward change + } + + case StopTask(taskId) => + // Old soldiers never die, they just fade away ;) + val fadeAwayTask = tasks.get(taskId) + if (fadeAwayTask.isDefined) { + context.stop(fadeAwayTask.get) + } + tasks -= taskId + + case unRegister@UnRegisterTask(taskId, _) => + // Sends UnRegisterTask to AppMaster + appMaster ! unRegister + } + } + + def restartingTasks(dagVersion: Int, remain: Int, needRestart: List[TaskId]): Receive = { + state = State.RECOVERY + box { + case TaskStopped(actor) => + for (taskId <- getTaskId(actor)) { + if (taskArgumentStore.get(dagVersion, taskId).nonEmpty) { + val newNeedRestart = needRestart :+ taskId + val newRemain = remain - 1 + if (newRemain == 0) { + val newRestarted = newNeedRestart.map { taskId_ => + val taskActor = launchTask(taskId_, taskArgumentStore.get(dagVersion, taskId_).get) + context.watch(taskActor) + taskId_ -> taskActor + }.toMap + + tasks = newRestarted + context.become(dynamicDagPhase1(dagVersion, newNeedRestart, List.empty[ChangeTask], + List.empty[TaskId])) + } else { + context.become(restartingTasks(dagVersion, newRemain, newNeedRestart)) + } + } + } + } + } + + val terminationWatch: Receive = { + case Terminated(actor) => + if (actor.compareTo(appMaster) == 0) { + LOG.info(s"AppMaster ${appMaster.path.toString} is terminated, shutting down current " + + s"executor $appId, $executorId") + context.stop(self) + } else { + self ! TaskStopped(actor) + } + } + + def onRestartTasks: Receive = { + case RestartTasks(dagVersion) => + LOG.info(s"Executor received restart tasks") + val tasksToRestart = tasks.keys.count(taskArgumentStore.get(dagVersion, _).nonEmpty) + express.remoteAddressMap.send(Map.empty[Long, HostPort]) + context.become(restartingTasks(dagVersion, remain = tasksToRestart, + needRestart = List.empty[TaskId])) + + tasks.values.foreach { + case task: ActorRef => task ! PoisonPill + } + } + + def executorService: Receive = terminationWatch orElse onRestartTasks orElse { + case taskChanged: TaskChanged => + // Skip + case get: GetExecutorSummary => + val logFile = LogUtil.applicationLogDir(systemConfig) + val processorTasks = tasks.keySet.groupBy(_.processorId).mapValues(_.toList).view.force + sender ! ExecutorSummary( + executorId, + worker.workerId, + address, + logFile.getAbsolutePath, + state, + tasks.size, + processorTasks, + jvmName = ManagementFactory.getRuntimeMXBean().getName()) + + case query: QueryExecutorConfig => + sender ! ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) + case HealthCheck => + context.system.scheduler.scheduleOnce(3.second)(HealthCheck) + if (state != State.ACTIVE && (transitionEnd - transitionStart) > transitWarningThreshold) { + LOG.error(s"Executor status: " + state + + s", it takes too long(${transitionEnd - transitionStart}) to do transition") + } + } + + private def getSerializerPool(): SerializationFramework = { + val system = context.system.asInstanceOf[ExtendedActorSystem] + val clazz = Class.forName(systemConfig.getString(Constants.GEARPUMP_SERIALIZER_POOL)) + val pool = clazz.newInstance().asInstanceOf[SerializationFramework] + pool.init(system, userConf) + pool.asInstanceOf[SerializationFramework] + } + + private def unHandled(state: String): Receive = { + case other => + LOG.info(s"Received unknown message $other in state: $state") + } + + private def box(receive: Receive): Receive = { + executorService orElse receive orElse unHandled(state) + } +} + +object Executor { + case class RestartTasks(dagVersion: Int) + + class TaskArgumentStore { + + private var store = Map.empty[TaskId, List[TaskArgument]] + + def add(taskId: TaskId, task: TaskArgument): Unit = { + val list = store.getOrElse(taskId, List.empty[TaskArgument]) + store += taskId -> (task :: list) + } + + def get(dagVersion: Int, taskId: TaskId): Option[TaskArgument] = { + store.get(taskId).flatMap { list => + list.find { arg => + arg.dagVersion <= dagVersion + } + } + } + + /** + * When the new DAG is successfully deployed, then we should remove obsolete + * TaskArgument of old DAG. + */ + def removeObsoleteVersion(): Unit = { + store = store.map { kv => + val (k, list) = kv + (k, list.take(1)) + } + } + + def removeNewerVersion(currentVersion: Int): Unit = { + store = store.map { kv => + val (k, list) = kv + (k, list.filter(_.dagVersion <= currentVersion)) + } + } + } + + case class TaskStopped(task: ActorRef) + + case class ExecutorSummary( + id: Int, + workerId: WorkerId, + actorPath: String, + logFile: String, + status: String, + taskCount: Int, + tasks: Map[ProcessorId, List[TaskId]], + jvmName: String + ) + + object ExecutorSummary { + def empty: ExecutorSummary = { + ExecutorSummary(0, WorkerId.unspecified, "", "", "", 1, null, jvmName = "") + } + } + + case class GetExecutorSummary(executorId: Int) + + case class QueryExecutorConfig(executorId: Int) + + case class ExecutorConfig(config: Config) + + class DagVersionMismatchException(msg: String) extends Exception(msg) + + object State { + val ACTIVE = "active" + val DYNAMIC_DAG_PHASE1 = "dynamic_dag_phase1" + val DYNAMIC_DAG_PHASE2 = "dynamic_dag_phase2" + val RECOVERY = "dag_recovery" + } + + object HealthCheck +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala new file mode 100644 index 0000000..ef96ab9 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.executor + +import scala.collection.immutable +import scala.concurrent.duration.Duration + +import org.apache.gearpump.streaming.task.TaskId +import org.apache.gearpump.util.RestartPolicy + +/** + * + * Controls how many retries to recover failed executors. + * + * @param maxNrOfRetries the number of times a executor is allowed to be restarted, + * negative value means no limit, if the limit is exceeded the policy + * will not allow to restart the executor + * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf + * means no window + */ +class ExecutorRestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) { + private var executorToTaskIds = Map.empty[Int, Set[TaskId]] + private var taskRestartPolocies = new immutable.HashMap[TaskId, RestartPolicy] + + def addTaskToExecutor(executorId: Int, taskId: TaskId): Unit = { + var taskSetForExecutorId = executorToTaskIds.getOrElse(executorId, Set.empty[TaskId]) + taskSetForExecutorId += taskId + executorToTaskIds += executorId -> taskSetForExecutorId + if (!taskRestartPolocies.contains(taskId)) { + taskRestartPolocies += taskId -> new RestartPolicy(maxNrOfRetries, withinTimeRange) + } + } + + def allowRestartExecutor(executorId: Int): Boolean = { + executorToTaskIds.get(executorId).map { taskIds => + taskIds.foreach { taskId => + taskRestartPolocies.get(taskId).map { policy => + if (!policy.allowRestart) { + // scalastyle:off return + return false + // scalastyle:on return + } + } + } + } + true + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/executor/TaskLauncher.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/TaskLauncher.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/TaskLauncher.scala new file mode 100644 index 0000000..18490ee --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/TaskLauncher.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.executor + +import akka.actor.{Actor, ActorRef, ActorRefFactory, Props} + +import org.apache.gearpump.cluster.{ExecutorContext, UserConfig} +import org.apache.gearpump.serializer.SerializationFramework +import org.apache.gearpump.streaming.ProcessorDescription +import org.apache.gearpump.streaming.executor.TaskLauncher.TaskArgument +import org.apache.gearpump.streaming.task._ +import org.apache.gearpump.streaming.util.ActorPathUtil + +trait ITaskLauncher { + + /** Launch a list of task actors */ + def launch(taskIds: List[TaskId], argument: TaskArgument, + context: ActorRefFactory, serializer: SerializationFramework, dispatcher: String) + : Map[TaskId, ActorRef] +} + +class TaskLauncher( + appId: Int, + appName: String, + executorId: Int, + appMaster: ActorRef, + userConf: UserConfig, + taskActorClass: Class[_ <: Actor]) + extends ITaskLauncher{ + + override def launch( + taskIds: List[TaskId], argument: TaskArgument, + context: ActorRefFactory, serializer: SerializationFramework, dispatcher: String) + : Map[TaskId, ActorRef] = { + import argument.{processorDescription, subscribers} + + val taskConf = userConf.withConfig(processorDescription.taskConf) + + val taskContext = TaskContextData(executorId, + appId, appName, appMaster, + processorDescription.parallelism, + processorDescription.life, subscribers) + + val taskClass = TaskUtil.loadClass(processorDescription.taskClass) + + var tasks = Map.empty[TaskId, ActorRef] + taskIds.foreach { taskId => + val task = new TaskWrapper(taskId, taskClass, taskContext, taskConf) + val taskActor = context.actorOf(Props(taskActorClass, taskId, taskContext, userConf, task, + serializer).withDispatcher(dispatcher), ActorPathUtil.taskActorName(taskId)) + tasks += taskId -> taskActor + } + tasks + } +} + +object TaskLauncher { + + case class TaskArgument( + dagVersion: Int, processorDescription: ProcessorDescription, + subscribers: List[Subscriber]) + + def apply(executorContext: ExecutorContext, userConf: UserConfig): TaskLauncher = { + import executorContext.{appId, appMaster, appName, executorId} + new TaskLauncher(appId, appName, executorId, appMaster, userConf, classOf[TaskActor]) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala new file mode 100644 index 0000000..6721cfc --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.metrics + +import java.util + +import com.typesafe.config.Config + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.cluster.ClientToMaster.ReadOption +import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem +import io.gearpump.google.common.collect.Iterators +import org.apache.gearpump.metrics.Metrics.{Histogram, Meter} +import org.apache.gearpump.metrics.MetricsAggregator +import org.apache.gearpump.streaming.metrics.ProcessorAggregator._ +import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig + +/** + * + * Does aggregation on metrics after grouping by these three attributes: + * 1. processorId + * 2. time section(represented as a index integer) + * 3. metricName(like sendThroughput) + * + * It assumes that for each [[org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem]], the + * name follow the format app(appId).processor(processorId).task(taskId).(metricName) + * + * It parses the name to get processorId and metricName. If the parsing fails, then current + * [[org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem]] will be skipped. + * + * NOTE: this class is optimized for performance. + */ +class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends MetricsAggregator { + + def this(config: Config) = { + this(HistoryMetricsConfig(config)) + } + + private val aggregatorFactory: AggregatorFactory = new AggregatorFactory() + + /** + * Accepts options: + * key: "readOption", value: one of "readLatest", "readRecent", "readHistory" + */ + override def aggregate(options: Map[String, String], + inputs: Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] = { + val readOption = options.get(ReadOption.Key).getOrElse(ReadOption.ReadLatest) + aggregate(readOption, inputs, System.currentTimeMillis()) + } + + def aggregate( + readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: TimeStamp) + : List[HistoryMetricsItem] = { + val (start, end, interval) = getTimeRange(readOption, now) + val timeSlotsCount = ((end - start - 1) / interval + 1).toInt + val map = new MultiLayerMap[Aggregator](timeSlotsCount) + + val taskIdentity = new TaskIdentity(0, null) + while (inputs.hasNext) { + val item = inputs.next() + + if (item.value.isInstanceOf[Meter] || item.value.isInstanceOf[Histogram]) { + if (item.time >= start && item.time < end) { + val timeIndex = ((item.time - start) / interval).toInt + + if (parseName(item.value.name, taskIdentity)) { + var op = map.get(timeIndex, taskIdentity.group) + if (op == null) { + op = aggregatorFactory.create(item, taskIdentity.group) + map.put(timeIndex, taskIdentity.group, op) + } + op.aggregate(item) + } + } + } + } + + val result = new Array[HistoryMetricsItem](map.size) + val iterator = map.valueIterator + var index = 0 + while (iterator.hasNext()) { + val op = iterator.next() + result(index) = op.result + index += 1 + } + + result.toList + } + + // Returns (start, end, interval) + private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp) + : (TimeStamp, TimeStamp, TimeStamp) = { + readOption match { + case ReadOption.ReadRecent => + val end = now + val start = end - (historyMetricConfig.retainRecentDataSeconds) * 1000 + val interval = historyMetricConfig.retainRecentDataIntervalMs + (floor(start, interval), floor(end, interval), interval) + case ReadOption.ReadHistory => + val end = now + val start = end - (historyMetricConfig.retainHistoryDataHours) * 3600 * 1000 + val interval = historyMetricConfig.retainHistoryDataIntervalMs + (floor(start, interval), floor(end, interval), interval) + case _ => + // All data points are aggregated together. + (0L, Long.MaxValue, Long.MaxValue) + } + } + + // The original metrics data is divided by interval points: + // time series (0, interval, 2*interval, 3*interval....) + // floor(..) make sure the Aggregator use the same set of interval points. + private def floor(value: Long, interval: Long): Long = { + (value / interval) * interval + } + + // Returns "app0.processor0:sendThroughput" as the group Id. + private def parseName(name: String, result: TaskIdentity): Boolean = { + val taskIndex = name.indexOf(TASK_TAG) + if (taskIndex > 0) { + val processor = name.substring(0, taskIndex) + val typeIndex = name.indexOf(":", taskIndex + 1) + if (typeIndex > 0) { + result.task = (name.substring(taskIndex + TASK_TAG.length, typeIndex)).toShort + val metricName = name.substring(typeIndex) + result.group = processor + metricName + true + } else { + false + } + } else { + false + } + } +} + +object ProcessorAggregator { + val readOption = ReadOption.Key + + private val TASK_TAG = ".task" + + private class TaskIdentity(var task: Short, var group: String) + + /** + * + * MultiLayerMap has multiple layers. For each layer, there + * is a hashMap. + * + * To access a value with get, user need to specify first layer Id, then key. + * + * This class is optimized for performance. + */ + class MultiLayerMap[Value](layers: Int) { + + private var _size: Int = 0 + private val map: Array[java.util.HashMap[String, Value]] = createMap(layers) + + /** + * @param key key in current layer + * @return return null if key is not found + */ + def get(layer: Int, key: String): Value = { + if (layer < layers) { + map(layer).get(key) + } else { + null.asInstanceOf[Value] + } + } + + def put(layer: Int, key: String, value: Value): Unit = { + if (layer < layers) { + map(layer).put(key, value) + _size += 1 + } + } + + def size: Int = _size + + def valueIterator: util.Iterator[Value] = { + val iterators = new Array[util.Iterator[Value]](layers) + var layer = 0 + while (layer < layers) { + iterators(layer) = map(layer).values().iterator() + layer += 1 + } + + Iterators.concat(iterators: _*) + } + + private def createMap(layers: Int) = { + val map = new Array[java.util.HashMap[String, Value]](layers) + var index = 0 + val length = map.length + while (index < length) { + map(index) = new java.util.HashMap[String, Value]() + index += 1 + } + map + } + } + + trait Aggregator { + def aggregate(item: HistoryMetricsItem): Unit + def result: HistoryMetricsItem + } + + class HistogramAggregator(name: String) extends Aggregator { + + var count: Long = 0 + var mean: Double = 0 + var stddev: Double = 0 + var median: Double = 0 + var p95: Double = 0 + var p99: Double = 0 + var p999: Double = 0 + + var startTime: TimeStamp = Long.MaxValue + + override def aggregate(item: HistoryMetricsItem): Unit = { + val input = item.value.asInstanceOf[Histogram] + count += 1 + mean += input.mean + stddev += input.stddev + median += input.median + p95 += input.p95 + p99 += input.p99 + p999 += input.p999 + + if (item.time < startTime) { + startTime = item.time + } + } + + override def result: HistoryMetricsItem = { + if (count > 0) { + HistoryMetricsItem(startTime, Histogram(name, mean / count, stddev / count, + median / count, p95 / count, p99 / count, p999 / count)) + } else { + HistoryMetricsItem(0, Histogram(name, 0, 0, 0, 0, 0, 0)) + } + } + } + + class MeterAggregator(name: String) extends Aggregator { + + var count: Long = 0 + var meanRate: Double = 0 + var m1: Double = 0 + var rateUnit: String = null + + var startTime: TimeStamp = Long.MaxValue + + override def aggregate(item: HistoryMetricsItem): Unit = { + + val input = item.value.asInstanceOf[Meter] + count += input.count + + meanRate += input.meanRate + m1 += input.m1 + + if (null == rateUnit) { + rateUnit = input.rateUnit + } + + if (item.time < startTime) { + startTime = item.time + } + } + + override def result: HistoryMetricsItem = { + HistoryMetricsItem(startTime, Meter(name, count, meanRate, + m1, rateUnit)) + } + } + + class AggregatorFactory { + def create(item: HistoryMetricsItem, name: String): Aggregator = { + item.value match { + case meter: Meter => new MeterAggregator(name) + case histogram: Histogram => new HistogramAggregator(name) + case _ => null // not supported + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregator.scala new file mode 100644 index 0000000..6bc8964 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregator.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.metrics + +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + +import com.typesafe.config.Config + +import org.apache.gearpump.cluster.ClientToMaster.ReadOption +import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem +import org.apache.gearpump.metrics.MetricsAggregator +import org.apache.gearpump.util.{Constants, LogUtil} + +/** + * Filters the latest metrics data by specifying a + * processor Id range, and taskId range. + */ +class TaskFilterAggregator(maxLimit: Int) extends MetricsAggregator { + + import org.apache.gearpump.streaming.metrics.TaskFilterAggregator._ + + def this(config: Config) = { + this(config.getInt(Constants.GEARPUMP_METRICS_MAX_LIMIT)) + } + override def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) + : List[HistoryMetricsItem] = { + + if (options.get(ReadOption.Key) != Some(ReadOption.ReadLatest)) { + // Returns empty set + List.empty[HistoryMetricsItem] + } else { + val parsed = Options.parse(options) + if (parsed != null) { + aggregate(parsed, inputs) + } else { + List.empty[HistoryMetricsItem] + } + } + } + + def aggregate(options: Options, inputs: Iterator[HistoryMetricsItem]) + : List[HistoryMetricsItem] = { + + val result = new ListBuffer[HistoryMetricsItem] + val effectiveLimit = Math.min(options.limit, maxLimit) + var count = 0 + + val taskIdentity = new TaskIdentity(0, 0) + + while (inputs.hasNext && count < effectiveLimit) { + val item = inputs.next() + if (parseName(item.value.name, taskIdentity)) { + if (taskIdentity.processor >= options.startProcessor && + taskIdentity.processor < options.endProcessor && + taskIdentity.task >= options.startTask && + taskIdentity.task < options.endTask) { + result.prepend(item) + count += 1 + } + } + } + result.toList + } + + // Assume the name format is: "app0.processor0.task0:sendThroughput", returns + // (processorId, taskId) + // + // returns true if success + private def parseName(name: String, result: TaskIdentity): Boolean = { + val processorStart = name.indexOf(PROCESSOR_TAG) + if (processorStart != -1) { + val taskStart = name.indexOf(TASK_TAG, processorStart + 1) + if (taskStart != -1) { + val processorId = name.substring(processorStart, taskStart).substring(PROCESSOR_TAG.length) + .toInt + result.processor = processorId + val taskEnd = name.indexOf(":", taskStart + 1) + if (taskEnd != -1) { + val taskId = name.substring(taskStart, taskEnd).substring(TASK_TAG.length).toInt + result.task = taskId + true + } else { + false + } + } else { + false + } + } else { + false + } + } +} + +object TaskFilterAggregator { + val StartTask = "startTask" + val EndTask = "endTask" + val StartProcessor = "startProcessor" + val EndProcessor = "endProcessor" + val Limit = "limit" + + val TASK_TAG = ".task" + val PROCESSOR_TAG = ".processor" + + private class TaskIdentity(var processor: Int, var task: Int) + + case class Options( + limit: Int, startTask: Int, endTask: Int, startProcessor: Int, endProcessor: Int) + + private val LOG = LogUtil.getLogger(getClass) + + object Options { + + def acceptAll: Options = { + new Options(Int.MaxValue, 0, Int.MaxValue, 0, Int.MaxValue) + } + + def parse(options: Map[String, String]): Options = { + // Do sanity check + val optionTry = Try { + val startTask = options.get(StartTask).map(_.toInt).getOrElse(0) + val endTask = options.get(EndTask).map(_.toInt).getOrElse(Integer.MAX_VALUE) + val startProcessor = options.get(StartProcessor).map(_.toInt).getOrElse(0) + val endProcessor = options.get(EndProcessor).map(_.toInt).getOrElse(Integer.MAX_VALUE) + val limit = options.get(Limit).map(_.toInt).getOrElse(DEFAULT_LIMIT) + new Options(limit, startTask, endTask, startProcessor, endProcessor) + } + + optionTry match { + case Success(options) => options + case Failure(ex) => + LOG.error("Failed to parse the options in TaskFilterAggregator. Error msg: " + + ex.getMessage) + null + } + } + } + + val DEFAULT_LIMIT = 1000 + val MAX_LIMIT = 1000 +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/package.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/package.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/package.scala new file mode 100644 index 0000000..27b1136 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/package.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump + +package object streaming { + type ProcessorId = Int + type TaskIndex = Int + type ExecutorId = Int +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSink.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSink.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSink.scala new file mode 100644 index 0000000..2d81a1f --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSink.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.sink + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.task.TaskContext + +/** + * Interface to implement custom data sink where result of a DAG is typically written + * a DataSink could be a data store like HBase or simply a console + * + * An example would be like: + * {{{ + * class ConsoleSink extends DataSink[String] { + * + * def open(context: TaskContext): Unit = {} + * + * def write(s: String): Unit = { + * Console.println(s) + * } + * + * def close(): Unit = {} + * } + * }}} + * + * Subclass is required to be serializable + */ +trait DataSink extends java.io.Serializable { + + /** + * Opens connection to data sink + * invoked at onStart() method of [[org.apache.gearpump.streaming.task.Task]] + * @param context is the task context at runtime + */ + def open(context: TaskContext): Unit + + /** + * Writes message into data sink + * invoked at onNext() method of [[org.apache.gearpump.streaming.task.Task]] + * @param message wraps data to be written out + */ + def write(message: Message): Unit + + /** + * Closes connection to data sink + * invoked at onClose() method of [[org.apache.gearpump.streaming.task.Task]] + */ + def close(): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkProcessor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkProcessor.scala new file mode 100644 index 0000000..973e371 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkProcessor.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.sink + +import akka.actor.ActorSystem + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Processor + +/** + * Utility that helps user to create a DAG ending in [[DataSink]] + * user should pass in a [[DataSink]]. + * + * here is an example to build a DAG that does word count and write to KafkaSink + * {{{ + * val split = Processor[Split](1) + * val sum = Processor[Sum](1) + * val sink = new KafkaSink() + * val sinkProcessor = DataSinkProcessor(sink, 1) + * val dag = split ~> sum ~> sink + * }}} + */ +object DataSinkProcessor { + def apply( + dataSink: DataSink, + parallelism: Int, + description: String = "", + taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem) + : Processor[DataSinkTask] = { + Processor[DataSinkTask](parallelism, description = description, + taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala new file mode 100644 index 0000000..eb6118d --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.sink + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +object DataSinkTask { + val DATA_SINK = "data_sink" +} + +/** + * General task that runs any [[DataSink]] + */ +class DataSinkTask(context: TaskContext, conf: UserConfig) extends Task(context, conf) { + import org.apache.gearpump.streaming.sink.DataSinkTask._ + + private val sink = conf.getValue[DataSink](DATA_SINK).get + + override def onStart(startTime: StartTime): Unit = { + LOG.info("opening data sink...") + sink.open(context) + } + + override def onNext(message: Message): Unit = { + sink.write(message) + } + + override def onStop(): Unit = { + LOG.info("closing data sink...") + sink.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala new file mode 100644 index 0000000..0fb6db4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.source + +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.Message + +import scala.util.Random + +/** + * Interface to implement custom source where data is read into the system. + * a DataSource could be a message queue like kafka or simply data generation source. + * + * An example would be like + * {{{ + * GenMsgSource extends DataSource { + * + * def open(context: TaskContext, startTime: TimeStamp): Unit = {} + * + * def read(context: TaskContext): Message = { + * Message("message") + * } + * + * def close(): Unit = {} + * } + * }}} + * + * subclass is required to be serializable + */ +trait DataSource extends java.io.Serializable { + + /** + * Opens connection to data source + * invoked in onStart() method of [[org.apache.gearpump.streaming.source.DataSourceTask]] + * + * @param context is the task context at runtime + * @param startTime is the start time of system + */ + def open(context: TaskContext, startTime: Long): Unit + + /** + * Reads next message from data source and + * returns null if no message is available + * + * @return a [[org.apache.gearpump.Message]] or null + */ + def read(): Message + + /** + * Closes connection to data source. + * invoked in onStop() method of [[org.apache.gearpump.streaming.source.DataSourceTask]] + */ + def close(): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala new file mode 100644 index 0000000..4a76958 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.source + +object DataSourceConfig { + + val SOURCE_READ_BATCH_SIZE = "gearpump.source.read.batch.size" + val SOURCE_TIMESTAMP_FILTER = "gearpump.source.timestamp.filter.class" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala new file mode 100644 index 0000000..ddd6c27 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.source + +import akka.actor.ActorSystem + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Processor + +/** + * Utility that helps user to create a DAG starting with [[DataSourceTask]] + * user should pass in a [[DataSource]] + * + * Here is an example to build a DAG that reads from Kafka source followed by word count + * {{{ + * val source = new KafkaSource() + * val sourceProcessor = DataSourceProcessor(source, 1) + * val split = Processor[Split](1) + * val sum = Processor[Sum](1) + * val dag = sourceProcessor ~> split ~> sum + * }}} + */ +object DataSourceProcessor { + def apply( + dataSource: DataSource, + parallelism: Int, + description: String = "", + taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem) + : Processor[DataSourceTask] = { + Processor[DataSourceTask](parallelism, description = description, + taskConf.withValue[DataSource](DataSourceTask.DATA_SOURCE, dataSource)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala new file mode 100644 index 0000000..6777721 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.source + +import org.apache.gearpump._ +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +object DataSourceTask { + val DATA_SOURCE = "data_source" +} + +/** + * Default Task container for [[org.apache.gearpump.streaming.source.DataSource]] that + * reads from DataSource in batch + * See [[org.apache.gearpump.streaming.source.DataSourceProcessor]] for its usage + * + * DataSourceTask calls: + * - `DataSource.open()` in `onStart` and pass in + * [[org.apache.gearpump.streaming.task.TaskContext]] + * and application start time + * - `DataSource.read()` in each `onNext`, which reads a batch of messages + * - `DataSource.close()` in `onStop` + */ +class DataSourceTask(context: TaskContext, conf: UserConfig) extends Task(context, conf) { + import org.apache.gearpump.streaming.source.DataSourceTask._ + + private val source = conf.getValue[DataSource](DATA_SOURCE).get + private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000) + private var startTime = 0L + + override def onStart(newStartTime: StartTime): Unit = { + startTime = newStartTime.startTime + LOG.info(s"opening data source at $startTime") + source.open(context, startTime) + self ! Message("start", System.currentTimeMillis()) + } + + override def onNext(message: Message): Unit = { + 0.until(batchSize).foreach { _ => + Option(source.read()).foreach(context.output) + } + self ! Message("continue", System.currentTimeMillis()) + } + + override def onStop(): Unit = { + LOG.info("closing data source...") + source.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilter.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilter.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilter.scala new file mode 100644 index 0000000..df54cc2 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilter.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.source + +import org.apache.gearpump.streaming.transaction.api.TimeStampFilter +import org.apache.gearpump.{Message, TimeStamp} + +/** + * TimeStampFilter filters out messages which have obsolete (smaller) timestamp. + */ +class DefaultTimeStampFilter extends TimeStampFilter { + override def filter(msg: Message, predicate: TimeStamp): Option[Message] = { + Option(msg).find(_.timestamp >= predicate) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Monoid.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Monoid.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Monoid.scala new file mode 100644 index 0000000..9886a72 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Monoid.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.api + +trait Monoid[T] extends java.io.Serializable { + def plus(l: T, r: T): T + def zero: T +} + +trait Group[T] extends Monoid[T] { + def minus(l: T, r: T): T +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala new file mode 100644 index 0000000..0e2f83a --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.api + +import org.apache.gearpump.TimeStamp + +/** + * MonoidState uses Algebird Monoid to aggregate state + * + * on start, state value is initialized to monoid.zero + * on each new message, existing state value is aggregated with + * the incoming value using monoid.plus to get a new state value + */ +abstract class MonoidState[T](monoid: Monoid[T]) extends PersistentState[T] { + // Left state updated by messages before checkpoint time + private[state] var left: T = monoid.zero + // Right state updated by message after checkpoint time + private[state] var right: T = monoid.zero + + protected var checkpointTime = Long.MaxValue + + override def get: Option[T] = Option(monoid.plus(left, right)) + + override def setNextCheckpointTime(nextCheckpointTime: TimeStamp): Unit = { + checkpointTime = nextCheckpointTime + } + + protected def updateState(timestamp: TimeStamp, t: T): Unit = { + if (timestamp < checkpointTime) { + left = monoid.plus(left, t) + } else { + right = monoid.plus(right, t) + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala new file mode 100644 index 0000000..906d331 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.api + +import org.apache.gearpump._ + +/** + * PersistentState is part of the transaction API + * + * Users could get transaction support from the framework by + * conforming to PersistentState APIs and extending PersistentTask + * to manage the state + */ +trait PersistentState[T] { + + /** + * Recovers state to a previous checkpoint + * usually invoked by the framework + */ + def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit + + /** + * Updates state on a new message + * this is invoked by user + */ + def update(timestamp: TimeStamp, t: T): Unit + + /** + * Sets next checkpoint time + * should be invoked by the framework + */ + def setNextCheckpointTime(timeStamp: TimeStamp): Unit + + /** + * Gets a binary snapshot of state + * usually invoked by the framework + */ + def checkpoint(): Array[Byte] + + /** + * Unwraps the raw value of state + */ + def get: Option[T] +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala new file mode 100644 index 0000000..e40d8cd --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.state.api + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} +import org.apache.gearpump.streaming.task.{ReportCheckpointClock, StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} + +object PersistentTask { + val CHECKPOINT = Message("checkpoint") + val LOG = LogUtil.getLogger(getClass) +} + +/** + * PersistentTask is part of the transaction API + * + * Users should extend this task if they want to get transaction support + * from the framework + */ +abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + import taskContext._ + + import org.apache.gearpump.streaming.state.api.PersistentTask._ + + val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory]( + PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get + val checkpointStore = checkpointStoreFactory.getCheckpointStore(conf, taskContext) + val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get + val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) + // System time interval to attempt checkpoint + private val checkpointAttemptInterval = 1000L + + /** + * Subclass should override this method to pass in a PersistentState. the framework has already + * offered two states: + * - NonWindowState: state with no time or other boundary + * - WindowState: each state is bounded by a time window + */ + def persistentState: PersistentState[T] + + /** + * Subclass should override this method to specify how a new message should update state + */ + def processMessage(state: PersistentState[T], message: Message): Unit + + /** Persistent state that will be stored (by checkpointing) automatically to storage like HDFS */ + val state = persistentState + + final override def onStart(startTime: StartTime): Unit = { + val timestamp = startTime.startTime + checkpointManager + .recover(timestamp) + .foreach(state.recover(timestamp, _)) + + reportCheckpointClock(timestamp) + scheduleCheckpoint(checkpointAttemptInterval) + } + + final override def onNext(message: Message): Unit = { + message match { + case CHECKPOINT => + val upstreamMinClock = taskContext.upstreamMinClock + if (checkpointManager.shouldCheckpoint(upstreamMinClock)) { + checkpointManager.getCheckpointTime.foreach { checkpointTime => + val serialized = state.checkpoint() + checkpointManager.checkpoint(checkpointTime, serialized) + .foreach(state.setNextCheckpointTime) + taskContext.output(Message(serialized, checkpointTime)) + reportCheckpointClock(checkpointTime) + } + } + scheduleCheckpoint(checkpointAttemptInterval) + case _ => + checkpointManager.update(message.timestamp) + .foreach(state.setNextCheckpointTime) + processMessage(state, message) + } + } + + final override def onStop(): Unit = { + checkpointManager.close() + } + + private def scheduleCheckpoint(interval: Long): Unit = { + scheduleOnce(new FiniteDuration(interval, TimeUnit.MILLISECONDS))(self ! CHECKPOINT) + } + + private def reportCheckpointClock(timestamp: TimeStamp): Unit = { + appMaster ! ReportCheckpointClock(taskContext.taskId, timestamp) + } +}
