Hi, *rrodseth*
thank you for link, I read it and try to re-write my code from Futures to
actor 'Tell', but at now example DAO are not async (but I think at now it's
not a problem):
import akka.actor._
import scala.concurrent.duration._
object ExampleActorBoot extends App {
val system = ActorSystem("ExampleActorBoot")
val dao = system.actorOf(Props[DaoActor], "dao")
val processor = system.actorOf(Props(new ProcessorActor("test", dao)),
"processor")
processor ! ProcessorActor.Process
}
case class TaskId(processorName: String, id: String)
case class TaskDetails(id: TaskId, status: String, json: String)
object DaoActor {
sealed trait DaoActorMessage {
def receiver: ActorRef
}
case class LoadTaskIds(processorName: String, receiver: ActorRef) extends
DaoActorMessage
case class LoadTaskDetails(id: TaskId, receiver: ActorRef) extends
DaoActorMessage
case class DeleteTask(id: TaskId, receiver: ActorRef) extends DaoActorMessage
case class MarkSuccess(id: TaskId, receiver: ActorRef) extends DaoActorMessage
}
class DaoActor extends Actor with ActorLogging {
import com.episodetracker.boot.DaoActor._
override def receive = {
case LoadTaskIds(processorName, receiver) =>
val result = 1 to 4 map {
case i =>
TaskId(processorName, i.toString)
}
receiver ! result
case LoadTaskDetails(id, receiver) =>
val result = TaskDetails(id, "pending", "JSON")
receiver ! result
case DeleteTask(id, receiver) =>
receiver ! true
case MarkSuccess(id, receiver) =>
receiver ! true
}
}
object ProcessorActor {
sealed trait ProcessorActorMessage
case object Process extends ProcessorActorMessage
private case class TaskComplete(id: TaskId, isSuccess: Boolean) extends
ProcessorActorMessage
private case class ProcessingComplete(result: Seq[(TaskId, Boolean)]) extends
ProcessorActorMessage
}
class ProcessorActor(name: String, dao: ActorRef) extends Actor with
ActorLogging {
import com.episodetracker.boot.ProcessorActor._
override def receive = {
case Process =>
dao ! DaoActor.LoadTaskIds(name, context.actorOf(Props(new
TasksProcessorResultActor())))
case ProcessingComplete(result) =>
sender() ! PoisonPill
log.info(s"Processing complete with results [$result]")
case unknown =>
log.error(s"Unknown message [$unknown]")
}
val emptyResult = Seq[(TaskId, Boolean)]()
class TasksProcessorResultActor extends Actor {
context.setReceiveTimeout(3 seconds)
override def receive = default(0, emptyResult)
def default(awaitResults: Int, completedTasks: Seq[(TaskId, Boolean)]):
Receive = {
case tasks: Seq[TaskId] =>
tasks.foreach {
case task =>
context.actorOf(Props(new TaskProcessorActor(task)))
}
context become default(tasks.size, emptyResult)
case TaskComplete(id, isSuccess) =>
val results = completedTasks :+ ((id, isSuccess))
if (results.size == awaitResults) {
context.parent ! ProcessingComplete(results)
} else {
context become default(awaitResults, results)
}
case unknown =>
log.error(s"Unknown message [$unknown]")
}
}
class TaskProcessorActor(taskId: TaskId) extends Actor with ActorLogging {
override def receive = default
dao ! DaoActor.LoadTaskDetails(taskId, self)
def default: Receive = {
case taskDetails: TaskDetails =>
taskDetails.status match {
case "success" =>
dao ! DaoActor.DeleteTask(taskDetails.id, self)
context.become(waitDaoResponse)
case _ =>
//TODO: do some processing
dao ! DaoActor.MarkSuccess(taskDetails.id, self)
context.become(waitDaoResponse)
}
case unknown =>
log.error(s"Unknown message [$unknown]")
}
def waitDaoResponse: Receive = {
case true =>
context.parent ! TaskComplete(taskId, true)
case false =>
context.parent ! TaskComplete(taskId, false)
}
}
}
I think it's not perfect and if code in "//TODO: do some processing" need
some DAO call, actor code with become go to hell.
What in second sample can be corrected or improved?
With best regards, Vladimir.
среда, 11 марта 2015 г., 17:14:21 UTC+3 пользователь rrodseth написал:
>
> Hi Vladimir
>
> I have had good luck with using per-request actors as in the Net-a-porter
> activator template, and a replyTo:ActorRef in the messages sent to my
> "DbActor" (which still talks to a DAO)
>
> Some more detail in this thread
>
> http://qnalist.com/questions/5581605/using-tell-or-ask-with-akka-persistence-and-rest
>
> On Tue, Mar 10, 2015 at 2:55 PM, Владимир Морозов <[email protected]
> <javascript:>> wrote:
>
>> Hi All,
>>
>> this question about design, for example now I have some process that load
>> task list and try process each task, it looks like this:
>>
>> import scala.concurrent.{ExecutionContext, Future}
>>
>>
>> case class TaskId(processorName: String, id: String)
>>
>> case class TaskDetails(id: TaskId, status: String, json: String)
>>
>> trait DAO {
>> def loadTaskIds(processorName: String): Future[Seq[TaskId]]
>>
>> def loadTaskDetails(id: TaskId): Future[Option[TaskDetails]]
>>
>> def deleteTask(id: TaskId): Future[Boolean]
>>
>> def markSuccess(id: TaskId): Future[Boolean]
>> }
>>
>> class Processor(name: String, dao: DAO, implicit val ec: ExecutionContext) {
>> def process(): Unit = {
>> dao
>> .loadTaskIds(name)
>> .map(_.map {
>> case taskId =>
>> dao.loadTaskDetails(taskId).flatMap {
>> case None =>
>> delete(taskId)
>> case Some(task) if task.status == "success" =>
>> delete(taskId)
>> case Some(task) =>
>> doProcess(task).flatMap {
>> case true =>
>> dao.markSuccess(taskId).map {
>> case true =>
>> Some(task)
>> case false =>
>> // log.error(...)
>> None
>> }
>> case false =>
>> // log.error(...)
>> Future.successful(None)
>> }
>> }
>> }).map {
>> case processingFutures =>
>> Future.sequence(processingFutures).map(_.flatten).map {
>> case completedTasks =>
>> // log.info(s"Processing '$name' tasks, complete
>> [${completedTasks.map(_.id).mkString(", ")}]")
>> }
>> }
>> }
>>
>> private def doProcess(task: TaskDetails): Future[Boolean] = ???
>>
>> private def delete(taskId: TaskId): Future[Option[TaskDetails]] =
>> dao.deleteTask(taskId).map {
>> case true =>
>> None
>> case false =>
>> // log.error(...)
>> None
>> }
>> }
>>
>>
>> My question: how I need to change this code for getting 'True' Actor
>> based application without using futures or ask pattern.
>>
>> PS: I read that I need use only actor tell (!) but I can't understand how
>> I can rewrite my logic and DAO as an actor.
>>
>> With best regards, Vladimir.
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected]
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.