I think a possible solution might look like this:
import scala.collection.mutable.Queue
import scala.concurrent.duration._
import akka.actor._
import akka.routing._
object State extends Enumeration {
val Unknown, Created, Sent = Value
}
trait Action
case class Unknown() extends Action
case class Init() extends Action
case class Check() extends Action
case class Send() extends Action
case class Enqueue(actor: (ActorContext) => ActorRef, message: Action)
class QueueRouter(val size: Int) extends Actor {
private val waiting: Queue[((ActorContext) => ActorRef, Any)] =
Queue.empty
private var running: Int = 0
override def receive = {
case command: Enqueue => {
println("OnEnqueue -> QueueRouter(waiting: %s, running: %s)".format(
waiting.size, running))
waiting.enqueue((command.actor, command.message))
if (running <= size) {
self ! Dequeue()
}
}
case command: Dequeue => {
println("OnDequeue -> QueueRouter(waiting: %s, running: %s)".format(
waiting.size, running))
if (waiting.nonEmpty) {
running += 1
val (actor, message) = waiting.dequeue
val actorRef = actor(context)
context.watch(actorRef)
actorRef ! message
}
}
case command: Terminated => {
println("OnTerminated -> QueueRouter(waiting: %s, running: %s)"
.format(waiting.size, running - 1))
running -= 1
context.unwatch(command.getActor)
self ! Dequeue()
}
}
override val supervisorStrategy = {
// TODO: Decide you supervision strategy
OneForOneStrategy(maxNrOfRetries = 1, withinTimeRange = 1 minute) {
case t: Throwable => SupervisorStrategy.Stop
}
}
private case class Dequeue()
}
class DummyActor(id: Int) extends Actor with FSM[State.Value, Action] {
startWith(State.Unknown, Unknown())
when(State.Unknown) {
case Event(current: Unknown, previous) => {
stay
}
case Event(current: Init, previous) => {
self ! Check()
goto(State.Created)
}
}
when(State.Created) {
case Event(current: Check, previous) => {
self ! Send()
goto(State.Sent)
}
}
when(State.Sent) {
case Event(current: Send, previous) => {
//println("Sent(%s): send start".format(id))
Thread.sleep(3000)
println("Sent(%s): send stop".format(id))
stop
}
}
initialize
}
object TestQueueRouter {
def factory(props: Props): (ActorContext) => ActorRef = {
def delegate(context: ActorContext) = {
context.actorOf(props)
}
delegate _
}
def main(args: Array[String]) = {
implicit val system = ActorSystem("main")
val router = Props(new
QueueRouter(2)).withRouter(RoundRobinRouter(nrOfInstances
= 2))
val pooler = system.actorOf(router)
for (i <- 0 until 10) {
pooler ! Enqueue(factory(Props(new DummyActor(i))), Init())
}
println("OK")
}
}
On Thursday, December 5, 2013 7:24:41 PM UTC+1, Eugene Dzhurinsky wrote:
>
> Hello!
>
> I want to convert some set of actors into FSM using Akka FSM. Currently
> system is designed in the way that every actor knows what to do with
> results of it's action and which actor is next in sequence of processing.
>
> Now I want to have some sort of dedicated actors, which are doing only
> things they should know (and now know about entire message routing), and
> central FSM, which knows how to route messages and process transformation
> flow.
>
> So overall idea is pictured at http://i.imgur.com/Sxib6dN.png
>
> Client sends some request to FSM actor, FSM actor - on transition to next
> state - sends message to some actor in onTransition block. That actor
> replies to sender with some message, which is processed inside FSM state
> somehow until request is finished.
>
> So far everything looks good, however I'm not sure what will happen if
> multiple clients will start interaction with FSM actor. Will the "workflow"
> be recorded somewhere, so flows from different clients won't collide at
> some point (like, FSM actor receives message from another client instead of
> originating one)?
>
> Is it safe to have say 10 FSM actors and round-robin router, or I need to
> create new FSM actor on every request from client, and then kill it once
> finished?
>
> Thanks!
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.