Hey there!
This is what I am trying to do
- Have a Supervisor monitor an Actor. The Actor should run forever and be
restarted if fails
object Coach {
def props(): Props = Props[Coach];
}
class Coach() extends Actor with ActorLogging {
val runner = context.actorOf(Runner.props(new Marathon), "runner")
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 1, withinTimeRange = 2 seconds) {
case _: RuntimeException => Restart
}
override def receive = LoggingReceive {
case GetSetGo => runner ! GoForIt
case "runner restarted" => println("runner restarted, sending message
again")
runner ! GoForIt
}
object Runner {
def props(race: Race) = Props(classOf[Runner], race)
}
class Runner(race: Race) extends Actor with ActorLogging {
import context.dispatcher
@throws[Exception](classOf[Exception])
override def postRestart(reason: Throwable): Unit = context.parent !
"runner restarted"
override def receive: Receive = LoggingReceive {
case GoForIt => {
println("running long job")
for (i <- 1 to 3) {
Thread.sleep(200)
}
throw new RuntimeException("MarathonRunner is tired")
}
case Failure(throwable) => throw throwable
}
}
When I run this, I can see in logs that it crashes as as per
SupervisorStrategy it was restarted 1 time and then stopped again
[DEBUG] [05/30/2015 19:23:44.785] [main] [EventStream(akka://race)] logger
log1-Logging$DefaultLogger started
[DEBUG] [05/30/2015 19:23:44.786] [main] [EventStream(akka://race)] Default
Loggers started
[DEBUG] [05/30/2015 19:23:44.789] [race-akka.actor.default-dispatcher-3]
[akka://race/system] now supervising
Actor[akka://race/system/deadLetterListener#-892788249]
[DEBUG] [05/30/2015 19:23:44.791] [race-akka.actor.default-dispatcher-2]
[akka://race/system/deadLetterListener] started
(akka.event.DeadLetterListener@50b0d70d)
[DEBUG] [05/30/2015 19:23:44.793] [race-akka.actor.default-dispatcher-4]
[akka://race/user] now supervising Actor[akka://race/user/coach#513818446]
*running long job*
[DEBUG] [05/30/2015 19:23:44.802] [race-akka.actor.default-dispatcher-2]
[akka://race/user/coach] started
(com.learner.ahka.runforever.Coach@3db89173)
[DEBUG] [05/30/2015 19:23:44.802] [race-akka.actor.default-dispatcher-3]
[akka://race/user/coach/runner] started
(com.learner.ahka.runforever.Runner@ed6aa7d)
[DEBUG] [05/30/2015 19:23:44.803] [race-akka.actor.default-dispatcher-2]
[akka://race/user/coach] now supervising
Actor[akka://race/user/coach/runner#80737681]
[DEBUG] [05/30/2015 19:23:44.803] [race-akka.actor.default-dispatcher-2]
[akka://race/user/coach] received handled message GetSetGo
*running long job*
[DEBUG] [05/30/2015 19:23:44.804] [race-akka.actor.default-dispatcher-4]
[akka://race/user/coach/runner] received handled message GoForIt
[ERROR] [05/30/2015 19:23:45.414] [race-akka.actor.default-dispatcher-3]
[akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at
com.learner.ahka.runforever.Runner$$anonfun$receive$1.applyOrElse(Runner.scala:26)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:62)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:50)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.event.LoggingReceive.applyOrElse(LoggingReceive.scala:50)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at com.learner.ahka.runforever.Runner.aroundReceive(Runner.scala:12)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[DEBUG] [05/30/2015 19:23:45.414] [race-akka.actor.default-dispatcher-2]
[akka://race/user/coach/runner] restarting
*runner restarted, sending message again*
[DEBUG] [05/30/2015 19:23:45.415] [race-akka.actor.default-dispatcher-2]
[akka://race/user/coach/runner] *restarted*
[DEBUG] [05/30/2015 19:23:45.416] [race-akka.actor.default-dispatcher-4]
[akka://race/user/coach] received handled message runner restarted
*running long job*
[DEBUG] [05/30/2015 19:23:45.416] [race-akka.actor.default-dispatcher-2]
[akka://race/user/coach/runner] received handled message GoForIt
[ERROR] [05/30/2015 19:23:46.026] [race-akka.actor.default-dispatcher-3]
[akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at
com.learner.ahka.runforever.Runner$$anonfun$receive$1.applyOrElse(Runner.scala:26)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:62)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:50)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.event.LoggingReceive.applyOrElse(LoggingReceive.scala:50)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at com.learner.ahka.runforever.Runner.aroundReceive(Runner.scala:12)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[DEBUG] [05/30/2015 19:23:46.032] [race-akka.actor.default-dispatcher-2]
[akka://race/user/coach/runner] *stopped*
*Side Note*: I do not know why in normal case Runner received message 2
times(*running long job*) before crashing
However, as per guidance
<https://groups.google.com/d/msg/akka-user/_qV5b2zpx74/0KAqQKiog_wJ> from
Konrad Malawski, I tried to delegate the actual long running task to
Future. The problem that I see is
- When Actor starts the first time, Future kicks off and crashes as per
expectation and handled correctly by supervisor
- After restart however, the Runner receives the message, logs statement,
but Future block in not executed. we can assert that from below
case object GetSetGo
object Coach {
def props(): Props = Props[Coach];
}
class Coach() extends Actor with ActorLogging {
val runner = context.actorOf(Runner.props(new Marathon), "runner")
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 1, withinTimeRange = 2 seconds) {
case _: RuntimeException => Restart
}
override def receive = LoggingReceive {
case GetSetGo => runner ! "GoForIt"
case "runner restarted" => println("runner restarted, sending message
again")
runner ! "GoForIt"
}
}
object Runner {
def props(race: Race) = Props(classOf[Runner], race)
}
class Runner(race: Race) extends Actor with ActorLogging {
import context.dispatcher
@throws[Exception](classOf[Exception])
override def postRestart(reason: Throwable): Unit = context.parent !
"runner restarted"
override def receive: Receive = LoggingReceive {
case "GoForIt" =>
println("I am starting to run")
race.start pipeTo self
case Failure(throwable) => throw throwable
}
}
trait Race {
def start: Future[Any]
}
class Marathon extends Race {
import scala.concurrent.ExecutionContext.Implicits.global
override def start: Future[Any] = future
val future = Future {
println("running long job")
for (i <- 1 to 3) {
Thread.sleep(200)
}
throw new RuntimeException("MarathonRunner is tired")
}
}
What I see in logs now is
[DEBUG] [05/30/2015 19:25:57.278] [main] [EventStream(akka://race)] logger
log1-Logging$DefaultLogger started
[DEBUG] [05/30/2015 19:25:57.278] [main] [EventStream(akka://race)] Default
Loggers started
[DEBUG] [05/30/2015 19:25:57.286] [race-akka.actor.default-dispatcher-3]
[akka://race/system] now supervising
Actor[akka://race/system/deadLetterListener#216015246]
[DEBUG] [05/30/2015 19:25:57.288] [race-akka.actor.default-dispatcher-4]
[akka://race/system/deadLetterListener] started
(akka.event.DeadLetterListener@6639712)
[DEBUG] [05/30/2015 19:25:57.290] [race-akka.actor.default-dispatcher-2]
[akka://race/user] now supervising Actor[akka://race/user/coach#-1729867088]
*running long job*
[DEBUG] [05/30/2015 19:25:57.303] [race-akka.actor.default-dispatcher-4]
[akka://race/user/coach] started (com.learner.ahka.runforever.Coach@63249c5)
[DEBUG] [05/30/2015 19:25:57.303] [race-akka.actor.default-dispatcher-3]
[akka://race/user/coach/runner] started
(com.learner.ahka.runforever.Runner@7a261495)
[DEBUG] [05/30/2015 19:25:57.303] [race-akka.actor.default-dispatcher-4]
[akka://race/user/coach] now supervising
Actor[akka://race/user/coach/runner#-797067975]
[DEBUG] [05/30/2015 19:25:57.304] [race-akka.actor.default-dispatcher-4]
[akka://race/user/coach] received handled message GetSetGo
*I am starting to run*
[DEBUG] [05/30/2015 19:25:57.305] [race-akka.actor.default-dispatcher-2]
[akka://race/user/coach/runner] received handled message GoForIt
[DEBUG] [05/30/2015 19:25:57.914] [race-akka.actor.default-dispatcher-3]
[akka://race/user/coach/runner] received handled message
Failure(java.lang.RuntimeException: MarathonRunner is tired)
[ERROR] [05/30/2015 19:25:57.919] [race-akka.actor.default-dispatcher-2]
[akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:22)
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:17)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[DEBUG] [05/30/2015 19:25:57.919] [race-akka.actor.default-dispatcher-4]
[akka://race/user/coach/runner] restarting
*runner restarted, sending message again*
[DEBUG] [05/30/2015 19:25:57.922] [race-akka.actor.default-dispatcher-4]
[akka://race/user/coach/runner] *restarted*
*I am starting to run*
[DEBUG] [05/30/2015 19:25:57.922] [race-akka.actor.default-dispatcher-3]
[akka://race/user/coach] received handled message runner restarted
[DEBUG] [05/30/2015 19:25:57.922] [race-akka.actor.default-dispatcher-4]
[akka://race/user/coach/runner] received handled message GoForIt
[DEBUG] [05/30/2015 19:25:57.922] [race-akka.actor.default-dispatcher-4]
[akka://race/user/coach/runner] received handled message
Failure(java.lang.RuntimeException: MarathonRunner is tired)
[ERROR] [05/30/2015 19:25:57.923] [race-akka.actor.default-dispatcher-2]
[akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:22)
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:17)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[DEBUG] [05/30/2015 19:25:57.925] [race-akka.actor.default-dispatcher-3]
[akka://race/user/coach/runner] *stopped*
As we could see
- the statement in Future (running long job) is printed once, before
crashing
- the statement in Runner(I am starting to run)
- The Future is not executed after Runner restarted
Please help me understand what am I doing wrong
Thank you very much
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.