Interesting idea, of course that violates the principle of not sending 
mutable state to an actor. I think it would only be necessary if there was 
significant overhead in creating and stopping the actors and Akka says that 
isn't the case. I think the only way this could work is if you sent an ask 
to the router and then the actor changed state, saved the original sender 
and then finished the ask when all data has been collected. So this idea 
realized would look something like the following. 

package actors;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Procedure;
import akka.pattern.PatternsCS;

import java.util.concurrent.CompletableFuture;

@SuppressWarnings("WeakerAccess")
public class RequestHandlerActor extends UntypedActor {
    private ResponseA a;
    private ResponseB b;
    private ResponseC c;
    private ActorRef originalSender;

    private Procedure<Object> idle = new Procedure<Object>() {
        @Override
        public void apply(final Object message) throws Exception {
            if (message instanceof StartProcessRequest) {
                originalSender = sender();
                getContext().become(busy);
                context().system().actorSelection("/user/A").tell(new 
RequestA(), getSelf());
            } else {
                unhandled(message);
            }
        }
    };

    private Procedure<Object> busy = new Procedure<Object>() {
        @Override
        public void apply(final Object message) throws Exception {
            if (message instanceof ResponseA) {
                a = (ResponseA) message;
                context().system().actorSelection("/user/B").tell(new 
RequestB(a.id), getSelf());
                context().system().actorSelection("/user/C").tell(new 
RequestC(a.id), getSelf());
            } else if (message instanceof ResponseB) {
                b = (ResponseB) message;
            } else if (message instanceof ResponseC) {
                c = (ResponseC) message;
            } else {
                unhandled(message);
            }
            if ((a != null && b != null && c != null)) {
                originalSender.tell(new FinalResponse(a, b, c), getSelf());
                originalSender = null;
                getContext().become(idle);
            }
        }
    };

    public static Props props(final CompletableFuture<FinalResponse> future) {
        return Props.create(RequestHandlerActor.class, future);
    }

    public static CompletableFuture<FinalResponse> invoke(final ActorSystem 
system) {
        return PatternsCS.ask(system.actorSelection("/user/myRouter"), new 
StartProcessRequest(), 60000)
                .thenApply(v -> (FinalResponse) v)
                .toCompletableFuture();
    }

    public RequestHandlerActor() {
    }

    @Override
    public void preStart() throws Exception {
        getContext().become(idle);
    }

    @Override
    public void onReceive(final Object message) throws Exception {
        unhandled(message);
    }

    public static class FinalResponse {
        public FinalResponse(final ResponseA a, final ResponseB b, final 
ResponseC c) {
            // Code Here
        }
    }

    public static class RequestA {
    }

    public static class ResponseA {
        public final int id;

        public ResponseA(final int id) {
            this.id = id;
        }
    }

    public static class RequestB {
        public final int id;

        public RequestB(final int id) {
            this.id = id;
        }
    }

    public static class ResponseB {
    }

    public static class RequestC {
        public final int id;

        public RequestC(final int id) {
            this.id = id;
        }
    }

    public static class ResponseC {
    }

    public static class StartProcessRequest {
    }
}




The only issue that I see with this is that the router is going to need to 
know which actors are busy and which aren't and I don't see how they can do 
that without sending a message to the actor itself which would be 
heavyweight if you have to poll all the actors to figure out which is 
available. Another thought would be to make the router just be a least 
mailbox router and then have the messages for processing a new request be 
stashed so they queue up to be handled next. 

The benefit of this is that we can control the actor pool size rather than 
have it grow according to load.

Ideas? Comments? Flames? 


On Wednesday, June 29, 2016 at 10:16:28 AM UTC-5, Guido Medina wrote:
>
> But it can be improved; instead of creating and destroying actors per 
> Future why not just create a pool of actors (or workers if you will),
> the recommended amount of concurrent actors you want is usually a factor 
> of CPUs, say CPUs x N where N in [1..4], of course,
> and if these actors use legacy code and have a chance to block create them 
> on a separate dispatcher.
>
> Once you create these actors put them in a RoundRobin router so that they 
> alternate (who will receive the next future) and send the Future to your 
> router.
> You can do all that programmatically, these actors can be created with a 
> Creator if you need to pass constructor parameter to wire them up with 
> other things,
> I will post later sample code of a router with workers if you haven't 
> figured it out, let me know if you do or don't in a couple of hours.
>
> HTH,
>
> Guido.
>
> On Wednesday, June 29, 2016 at 3:25:23 PM UTC+1, kraythe wrote:
>>
>> Thanks for the reply Mark. I understand where you are comming from but 
>> the actual implementation in proprietary code is quit a bit more complex. 
>> Future a response drives data needed for future b and c requests. The 
>> actual code with completable futures is significantly more complex. I would 
>> rather be able to do an ask and collect three responses before returning 
>> from the ask but I don't think there is a way to do that without encoding 
>> asks into the actor itself which seems worse than this. Currently the 
>> implementation works. I did add a receive timeout though. If there is a 
>> better way or something wrong with this way then I would change it
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to