A couple of nights ago I met Jamie Allen at his talk about Akka in NYC and
he suggested the solution to this problem is to add a supervisor actor
between the service actor and the worker actor that will store the context
of the request in case of a fault. That fixes the problem. Below is the
full code for those that want to use this pattern:
package actors;
import akka.actor.*;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import play.Logger;
import scala.concurrent.duration.Duration;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
public class FaultTolerantServiceActor extends AbstractActor {
private int count = 0;
public FaultTolerantServiceActor() {
receive(
ReceiveBuilder.match(FaultTolerantServiceActor.ToUpperRequest.class,
request -> {
Logger.debug("About service: " + self().toString() + "
Thread: " + Thread.currentThread().getName());
count++;
final ActorRef supervisor =
context().actorOf(Props.create(SupervisorActor.class, count));
supervisor.forward(request, context());
}).matchAny(obj -> {
Logger.error("I cannot handle objects of type: " +
obj.getClass().getName());
}).build()
);
}
public static class SupervisorActor extends AbstractActor {
private ActorRef sender;
private Object request;
private SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create("1 minute"),
DeciderBuilder.
match(IllegalArgumentException.class, e -> {
sender.tell(new FailureToUpResponse(request,
"Cannot handle the string 'error'!"), self());
return stop();
}).
matchAny(o -> escalate()).build());
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
public SupervisorActor(int count) {
receive(
ReceiveBuilder.matchAny(obj -> {
sender = sender();
request = obj;
final ActorRef worker =
context().actorOf(Props.create(Worker.class, count));
worker.forward(obj, context());
}).build()
);
}
}
public static class Worker extends AbstractActor {
public Worker(int count) {
receive(
ReceiveBuilder.match(FaultTolerantServiceActor.ToUpperRequest.class,
request -> {
Logger.debug("About me: " + self().toString() + "
Thread: " + Thread.currentThread().getName());
if(request.input.equals("error")) throw new
IllegalArgumentException("Cannot handle the word error!");
Thread.sleep(5000);
Logger.debug("Did the work: " +
request.input.toUpperCase() + "-" + count);
final ToUpperResponse response = new
ToUpperResponse(request.input, request.input.toUpperCase() + "-" + count);
sender().tell(response, self());
context().stop(self());
}).build()
);
}
}
public static class ToUpperRequest {
public final String input;
public ToUpperRequest(String input) {
this.input = input;
}
}
public static class ToUpperResponse {
public final String input;
public final String output;
public ToUpperResponse(String input, String output) {
this.input = input;
this.output = output;
}
}
public static class FailureToUpResponse {
public final Object request;
public final String message;
public FailureToUpResponse(Object request, String message) {
this.request = request;
this.message = message;
}
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.