I tried to model this problem and build out a structure where a
long-running work is given to dispatcher (I believe it should be a separate
dispatcher than to use a global one). But in this case, the supervisor
never gets the failure message and never applies strategy on it.  Here is
my code

package com.learner.ahka.runforever

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

object RaceEvent extends App {
  val config = ConfigFactory.parseString( """
    akka.loglevel = "DEBUG"
    akka.actor.debug {
      receive = on
      lifecycle = on
    }
                                          """)
  val system = ActorSystem.create("race", config)
  val coach = system.actorOf(Coach.props(), "coach")
  coach ! GetSetGo
}


package com.learner.ahka.runforever

import akka.actor.SupervisorStrategy.Restart
import akka.actor._
import akka.event.LoggingReceive

import scala.concurrent.duration._

case object GetSetGo

object Coach {
  def props(): Props = Props[Coach];
}

class Coach() extends Actor with ActorLogging {

  val runner = context.actorOf(Runner.props(new Marathon), "runner")

  override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 5 seconds) {
    case _: RuntimeException => Restart
  }

  override def receive = LoggingReceive {
    case GetSetGo => runner ! GoForIt
  }
}


package com.learner.ahka.runforever

import akka.actor.{Actor, ActorLogging, Props}
import akka.event.LoggingReceive
import scala.concurrent.ExecutionContext.Implicits.global

object Runner {
  def props(race: Race) = Props(classOf[Runner], race)
}

class Runner(race: Race) extends Actor with ActorLogging {

  override def receive: Receive = LoggingReceive {
    case GoForIt => {
      race.start onFailure {
        case e: RuntimeException => throw e
      }
    }
  }
}


package com.learner.ahka.runforever

import scala.concurrent.Future

case object GoForIt

trait Race {
  def start: Future[Any]
}

class Marathon extends Race {

  import scala.concurrent.ExecutionContext.Implicits.global

  override def start: Future[Any] = future

  val future = Future {
    for (i <- 1 to 3) {
      println("I am a Marathon Runner!")
      Thread.sleep(1000)
    }
    throw new RuntimeException("MarathonRunner is tired")
  }
}

Can someone help me explain what am I doing wrong here?

The logs that I see are

[DEBUG] [05/30/2015 15:02:01.364] [main] [EventStream(akka://race)] logger
log1-Logging$DefaultLogger started
[DEBUG] [05/30/2015 15:02:01.365] [main] [EventStream(akka://race)] Default
Loggers started
[DEBUG] [05/30/2015 15:02:01.370] [race-akka.actor.default-dispatcher-3]
[akka://race/system] now supervising
Actor[akka://race/system/deadLetterListener#-325855940]
[DEBUG] [05/30/2015 15:02:01.371] [race-akka.actor.default-dispatcher-2]
[akka://race/system/deadLetterListener] started
(akka.event.DeadLetterListener@171fdafb)
[DEBUG] [05/30/2015 15:02:01.375] [race-akka.actor.default-dispatcher-4]
[akka://race/user] now supervising Actor[akka://race/user/coach#1377762623]
I am a Marathon Runner!
[DEBUG] [05/30/2015 15:02:01.388] [race-akka.actor.default-dispatcher-2]
[akka://race/user/coach] started (com.learner.ahka.runforever.Coach@71538e2c
)
[DEBUG] [05/30/2015 15:02:01.388] [race-akka.actor.default-dispatcher-3]
[akka://race/user/coach/runner] started
(com.learner.ahka.runforever.Runner@1571c2a0)
[DEBUG] [05/30/2015 15:02:01.388] [race-akka.actor.default-dispatcher-2]
[akka://race/user/coach] now supervising
Actor[akka://race/user/coach/runner#1938289506]
[DEBUG] [05/30/2015 15:02:01.389] [race-akka.actor.default-dispatcher-2]
[akka://race/user/coach] received handled message GetSetGo
[DEBUG] [05/30/2015 15:02:01.390] [race-akka.actor.default-dispatcher-4]
[akka://race/user/coach/runner] received handled message GoForIt
I am a Marathon Runner!
I am a Marathon Runner!
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:22)
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:17)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thank you very much

On Fri, May 29, 2015 at 9:46 PM, Harit Himanshu <
[email protected]> wrote:

> Considering the following Scenario
>
> CoachSupervisor
>>             |
>> MarathonRunnerActor
>
>
> Does that mean following?
>
>    - MarathonRunnerActor will start a new future by giving a custom
>    dispatcher of type PinnedDispatcher?
>    - If so, then in case of any failure how would CoachSupervisor be
>    notified? He is monitoring MarathonRunnerActor and not future (future is
>    not Actor)
>
> I am confused
>
> On Fri, May 29, 2015 at 9:47 AM, Harit Himanshu <
> [email protected]> wrote:
>
>> By that do you mean that the main() is never returning? If so, then
>> you're blocking the Actor and wasting an entire thread.
>> Spawn this off on a dedicated dispatcher instead (see dispatchers and
>> futures docs).
>>
>> Yes, this is suppose to run always, and even if it fails, it needs to
>> restart, which is why I was thinking to assign 1 actor to do this job
>> forever. (Pardon my understanding since I am very new to Akka and don't
>> know how to do this properly).
>> I will read up for how to spawn it on a dedicated dispatcher, but my
>> question is if it would fail, how would the supervisor know and restart it?
>>
>> Any ideas?
>>
>> How would you recommend such a method otherwise?
>>>
>> Extract an interface (trait) and in tests provide a NoopImplementation
>> instance of it.
>>
>> - Regarding Mocking, I would consider your advice and try to stay away
>> from it, I need to learn how to use your advice (I am new to this entire
>> ecosystem)
>>
>> On Fri, May 29, 2015 at 9:38 AM, Konrad Malawski <[email protected]>
>> wrote:
>>
>>>  def run: Unit = LogReaderDisruptor.main(Array())
>>>> is a method that is supposed to run forever, plus it required some
>>>> setup (that is available on client's machine or test environment),
>>>>
>>> By that do you mean that the main() is never returning? If so, then
>>> you're blocking the Actor and wasting an entire thread.
>>> Spawn this off on a dedicated dispatcher instead (see dispatchers and
>>> futures docs).
>>>
>>>
>>>> it was not trivial to set it up for Unit Test, which is why I mocked
>>>> that part out
>>>>
>>> The overall technique is very good, I'm just reserved on using mocking
>>> tools with concurrent code AFAIR this yields suprising results sometimes.
>>>
>>> How would you recommend such a method otherwise?
>>>>
>>> Extract an interface (trait) and in tests provide a NoopImplementation
>>> instance of it.
>>>
>>> -- Konrad
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to a topic in the
>>> Google Groups "Akka User List" group.
>>> To unsubscribe from this topic, visit
>>> https://groups.google.com/d/topic/akka-user/_qV5b2zpx74/unsubscribe.
>>> To unsubscribe from this group and all its topics, send an email to
>>> [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to