Here is a complete example that I have in my head.
Job Manager -- 1 to many --> Job -- 1 to many --> Task
// --------- DOMAIN MODEL ------------
sealed trait TaskStatus
case object Pending extends TaskStatus
case object Complete extends TaskStatus
case class Task(id: String, status: TaskStatus = Pending)
case class Job(id: String, tasks: Seq[Task]) {
def isFinished = tasks.forall(_.status == Complete)
def finishTask(task: Task) = copy(tasks = task :: tasks.filterNot(_.id ==
task.id))
}
// --------- DATABASE ------------
trait JobDatabase {
def save(job: Job): Future[Job]
def get(jobId: String): Future[Job]
}
// Job Manager oversees ALL jobs
// Supervision not here, but assert that we want to restart jobs on failure
object JobManager {
case class Submit(job: Job)
}
class JobManager(db: JobDatabase) extends Actor {
import JobManager._
def receive = {
case Submit(job: Job) =>
db.save(job).onComplete {
case Success(job) =>
// kick off a new job processing, we give the id as the name so it can
load its state
context.actorOf(Props(classOf[JobProcessor], db), job.id)
case Failure(e) => ??? // doesn't matter for this discussion
}
}
}
// Job Processor handles the processing of a long running individual job,
saving state as each task is complete...
// He loads his state from the database when he starts
object JobProcessor {
case object Process
case class Loaded(job: Job)
case class Saved(job: Job)
}
class JobProcessor(db: JobDatabase) extends Actor with Stash {
import akka.pattern.pipe
import scala.concurrent.ExecutionContext.Implicits.global
val jobId = self.path.name
var state: Job = _
def receive = loading
def loading: Receive = {
case JobProcessor.Loaded(job) =>
// job was loaded from the database
state = job
state.tasks.foreach(processTask)
unstashAll()
context.become(running, false)
case akka.actor.Status.Failure(e) =>
throw e // notify parent to restart us
}
def running: Receive = {
case TaskProcessor.Completed(task) =>
// update the job to mark this task as finished and save that
db.save(state.finishTask(task)).map(JobProcessor.Saved) pipeTo self
case JobProcessor.Saved(job) =>
state = job
if (state.isFinished)
self ! PoisonPill
case akka.actor.Status.Failure(e) =>
throw e // notify parent to restart us
}
private def processTask(task: Task) =
context.actorOf(Props(classOf[TaskProcessor])) ! TaskProcessor.Process(task
)
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
db.get(jobId).map(JobProcessor.Loaded) pipeTo self
super.preRestart(reason, message)
}
}
// Processes an individual task, I am less concernted about this guy, so do
something simple
object TaskProcessor {
case class Process(task: Task)
case class Completed(task: Task)
}
class TaskProcessor extends Actor {
def receive = {
case TaskProcessor.Process(task) =>
Thread.sleep(100)
context.parent ! TaskProcessor.Completed(task.copy(status = Complete))
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.