Interesting idea, of course that violates the principle of not sending
mutable state to an actor. I think it would only be necessary if there was
significant overhead in creating and stopping the actors and Akka says that
isn't the case. I think the only way this could work is if you sent an ask
to the router and then the actor changed state, saved the original sender
and then finished the ask when all data has been collected. So this idea
realized would look something like the following.
package actors;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Procedure;
import akka.pattern.PatternsCS;
import java.util.concurrent.CompletableFuture;
@SuppressWarnings("WeakerAccess")
public class RequestHandlerActor extends UntypedActor {
private ResponseA a;
private ResponseB b;
private ResponseC c;
private ActorRef originalSender;
private Procedure<Object> idle = new Procedure<Object>() {
@Override
public void apply(final Object message) throws Exception {
if (message instanceof StartProcessRequest) {
originalSender = sender();
getContext().become(busy);
context().system().actorSelection("/user/A").tell(new
RequestA(), getSelf());
} else {
unhandled(message);
}
}
};
private Procedure<Object> busy = new Procedure<Object>() {
@Override
public void apply(final Object message) throws Exception {
if (message instanceof ResponseA) {
a = (ResponseA) message;
context().system().actorSelection("/user/B").tell(new
RequestB(a.id), getSelf());
context().system().actorSelection("/user/C").tell(new
RequestC(a.id), getSelf());
} else if (message instanceof ResponseB) {
b = (ResponseB) message;
} else if (message instanceof ResponseC) {
c = (ResponseC) message;
} else {
unhandled(message);
}
if ((a != null && b != null && c != null)) {
originalSender.tell(new FinalResponse(a, b, c), getSelf());
originalSender = null;
getContext().become(idle);
}
}
};
public static Props props(final CompletableFuture<FinalResponse> future) {
return Props.create(RequestHandlerActor.class, future);
}
public static CompletableFuture<FinalResponse> invoke(final ActorSystem
system) {
return PatternsCS.ask(system.actorSelection("/user/myRouter"), new
StartProcessRequest(), 60000)
.thenApply(v -> (FinalResponse) v)
.toCompletableFuture();
}
public RequestHandlerActor() {
}
@Override
public void preStart() throws Exception {
getContext().become(idle);
}
@Override
public void onReceive(final Object message) throws Exception {
unhandled(message);
}
public static class FinalResponse {
public FinalResponse(final ResponseA a, final ResponseB b, final
ResponseC c) {
// Code Here
}
}
public static class RequestA {
}
public static class ResponseA {
public final int id;
public ResponseA(final int id) {
this.id = id;
}
}
public static class RequestB {
public final int id;
public RequestB(final int id) {
this.id = id;
}
}
public static class ResponseB {
}
public static class RequestC {
public final int id;
public RequestC(final int id) {
this.id = id;
}
}
public static class ResponseC {
}
public static class StartProcessRequest {
}
}
The only issue that I see with this is that the router is going to need to
know which actors are busy and which aren't and I don't see how they can do
that without sending a message to the actor itself which would be
heavyweight if you have to poll all the actors to figure out which is
available. Another thought would be to make the router just be a least
mailbox router and then have the messages for processing a new request be
stashed so they queue up to be handled next.
The benefit of this is that we can control the actor pool size rather than
have it grow according to load.
Ideas? Comments? Flames?
On Wednesday, June 29, 2016 at 10:16:28 AM UTC-5, Guido Medina wrote:
>
> But it can be improved; instead of creating and destroying actors per
> Future why not just create a pool of actors (or workers if you will),
> the recommended amount of concurrent actors you want is usually a factor
> of CPUs, say CPUs x N where N in [1..4], of course,
> and if these actors use legacy code and have a chance to block create them
> on a separate dispatcher.
>
> Once you create these actors put them in a RoundRobin router so that they
> alternate (who will receive the next future) and send the Future to your
> router.
> You can do all that programmatically, these actors can be created with a
> Creator if you need to pass constructor parameter to wire them up with
> other things,
> I will post later sample code of a router with workers if you haven't
> figured it out, let me know if you do or don't in a couple of hours.
>
> HTH,
>
> Guido.
>
> On Wednesday, June 29, 2016 at 3:25:23 PM UTC+1, kraythe wrote:
>>
>> Thanks for the reply Mark. I understand where you are comming from but
>> the actual implementation in proprietary code is quit a bit more complex.
>> Future a response drives data needed for future b and c requests. The
>> actual code with completable futures is significantly more complex. I would
>> rather be able to do an ask and collect three responses before returning
>> from the ask but I don't think there is a way to do that without encoding
>> asks into the actor itself which seems worse than this. Currently the
>> implementation works. I did add a receive timeout though. If there is a
>> better way or something wrong with this way then I would change it
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.