Here is a complete example that I have in my head.

Job Manager -- 1 to many --> Job -- 1 to many --> Task

// --------- DOMAIN MODEL ------------
sealed trait TaskStatus
case object Pending extends TaskStatus
case object Complete extends TaskStatus

case class Task(id: String, status: TaskStatus = Pending)

case class Job(id: String, tasks: Seq[Task]) {
 def isFinished = tasks.forall(_.status == Complete)

 def finishTask(task: Task) = copy(tasks = task :: tasks.filterNot(_.id == 
task.id))
}

// --------- DATABASE ------------
trait JobDatabase {

 def save(job: Job): Future[Job]

 def get(jobId: String): Future[Job]
}

// Job Manager oversees ALL jobs
// Supervision not here, but assert that we want to restart jobs on failure
object JobManager {

 case class Submit(job: Job)
}
class JobManager(db: JobDatabase) extends Actor {

 import JobManager._

 def receive = {
 case Submit(job: Job) =>
 db.save(job).onComplete {
 case Success(job) =>
 // kick off a new job processing, we give the id as the name so it can 
load its state
 context.actorOf(Props(classOf[JobProcessor], db), job.id)

 case Failure(e) => ??? // doesn't matter for this discussion
 }
 }
}

// Job Processor handles the processing of a long running individual job, 
saving state as each task is complete...
// He loads his state from the database when he starts
object JobProcessor {

 case object Process
 case class Loaded(job: Job)
 case class Saved(job: Job)
}
class JobProcessor(db: JobDatabase) extends Actor with Stash {

 import akka.pattern.pipe
 import scala.concurrent.ExecutionContext.Implicits.global

 val jobId = self.path.name

 var state: Job = _

 def receive = loading

 def loading: Receive = {
 case JobProcessor.Loaded(job) =>
 // job was loaded from the database
 state = job
 state.tasks.foreach(processTask)
 unstashAll()
 context.become(running, false)

 case akka.actor.Status.Failure(e) =>
 throw e // notify parent to restart us
 }

 def running: Receive = {
 case TaskProcessor.Completed(task) =>
 // update the job to mark this task as finished and save that
 db.save(state.finishTask(task)).map(JobProcessor.Saved) pipeTo self

 case JobProcessor.Saved(job) =>
 state = job
 if (state.isFinished)
 self ! PoisonPill

 case akka.actor.Status.Failure(e) =>
 throw e // notify parent to restart us
 }

 private def processTask(task: Task) =
 context.actorOf(Props(classOf[TaskProcessor])) ! TaskProcessor.Process(task
)

 override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
 db.get(jobId).map(JobProcessor.Loaded) pipeTo self
 super.preRestart(reason, message)
 }
}

// Processes an individual task, I am less concernted about this guy, so do 
something simple
object TaskProcessor {
 case class Process(task: Task)
 case class Completed(task: Task)
}
class TaskProcessor extends Actor {

 def receive = {
 case TaskProcessor.Process(task) =>
 Thread.sleep(100)
 context.parent ! TaskProcessor.Completed(task.copy(status = Complete))
 }
}





-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to