Requirement?
- There has to be a long running process(daemon) that should run forever
- In case of any exceptions, it should be restarted, but if it fails again
twice, no restart efforts should be taken
Problem I face?
- The actor is restarted but no message sent again
- As you can see, the message "I am a Marathon Runner!" is not printed again
My code looks like
*Main Class *
package com.learner.ahka.runforever
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
object RaceEvent extends App {
val config = ConfigFactory.parseString( """
akka.loglevel = "DEBUG"
akka.actor.debug {
receive = on
lifecycle = on
}
""")
val system = ActorSystem.create("race", config)
val coach = system.actorOf(Coach.props(), "coach")
coach ! GetSetGo
}
*Supervisor*
package com.learner.ahka.runforever
import akka.actor.SupervisorStrategy.{Escalate, Restart}
import akka.actor._
import akka.event.LoggingReceive
import scala.concurrent.duration._
case object GetSetGo
object Coach {
def props(): Props = Props[Coach];
}
class Coach() extends Actor with ActorLogging {
val runner = context.actorOf(Runner.props(new Marathon), "runner")
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 5 seconds) {
case _: RuntimeException => Restart
}
override def receive = LoggingReceive {
case GetSetGo => runner ! GoForIt
}
}
*Actor*
package com.learner.ahka.runforever
import akka.actor.Status.Failure
import akka.actor.{Actor, ActorLogging, Props}
import akka.event.LoggingReceive
import akka.pattern.pipe
object Runner {
def props(race: Race) = Props(classOf[Runner], race)
}
class Runner(race: Race) extends Actor with ActorLogging {
import context.dispatcher
override def receive: Receive = LoggingReceive {
case GoForIt => race.start pipeTo self
case Failure(throwable) => throw throwable
}
}
*Actual work*
package com.learner.ahka.runforever
import scala.concurrent.Future
case object GoForIt
trait Race {
def start: Future[Any]
}
class Marathon extends Race {
import scala.concurrent.ExecutionContext.Implicits.global
override def start: Future[Any] = future
val future = Future {
for (i <- 1 to 3) {
println("I am a Marathon Runner!")
Thread.sleep(1000)
}
throw new RuntimeException("MarathonRunner is tired")
}
}
*Logs*
[DEBUG] [05/30/2015 16:03:35.696] [main] [EventStream(akka://race)]
logger log1-Logging$DefaultLogger started
[DEBUG] [05/30/2015 16:03:35.698] [main] [EventStream(akka://race)]
Default Loggers started
[DEBUG] [05/30/2015 16:03:35.704]
[race-akka.actor.default-dispatcher-4] [akka://race/system] now supervising
Actor[akka://race/system/deadLetterListener#-1391310385]
[DEBUG] [05/30/2015 16:03:35.706]
[race-akka.actor.default-dispatcher-3]
[akka://race/system/deadLetterListener] started
(akka.event.DeadLetterListener@191ba186)
[DEBUG] [05/30/2015 16:03:35.710]
[race-akka.actor.default-dispatcher-2] [akka://race/user] now supervising
Actor[akka://race/user/coach#-1161587711]
I am a Marathon Runner!
[DEBUG] [05/30/2015 16:03:35.722]
[race-akka.actor.default-dispatcher-3] [akka://race/user/coach] started
(com.learner.ahka.runforever.Coach@66f0f319)
[DEBUG] [05/30/2015 16:03:35.722]
[race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner]
started (com.learner.ahka.runforever.Runner@72f67980)
[DEBUG] [05/30/2015 16:03:35.723]
[race-akka.actor.default-dispatcher-3] [akka://race/user/coach] now
supervising Actor[akka://race/user/coach/runner#755574648]
[DEBUG] [05/30/2015 16:03:35.723]
[race-akka.actor.default-dispatcher-3] [akka://race/user/coach] received
handled message GetSetGo
[DEBUG] [05/30/2015 16:03:35.725]
[race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner]
received handled message GoForIt
I am a Marathon Runner!
I am a Marathon Runner!
[DEBUG] [05/30/2015 16:03:38.739]
[race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner]
received handled message Failure(java.lang.RuntimeException: MarathonRunner
is tired)
[ERROR] [05/30/2015 16:03:38.752]
[race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner]
MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:22)
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:17)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[DEBUG] [05/30/2015 16:03:38.753]
[race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner]
restarting
[DEBUG] [05/30/2015 16:03:38.755]
[race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner]
restarted
Can someone please spot what I am missing?
Thank you
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.