Sorry for the late relpy. The code is as below:
public class RequestConsumer extends UntypedConsumerActor {
public String getEndpointUri() {
return "jetty:http://localhost:8877/vas-gateway";
}
public void onReceive(Object message) throws Exception {
.....
requestDTO.setEndpointActorRef(getSelf()); //preserve the ActorRef through
multiple actors
responseDTO = processRequest(requestDTO);
jsonResponse =
mapper.writerWithDefaultPrettyPrinter().writeValueAsString(responseDTO);
getSender().tell(jsonResponse, getSelf());
}
ResponseDTO processRequest(RequestDTO requestDTO) throws Exception {
Future futureResponse = Patterns.ask(requestProcessorActor,
RequestProcessorActor.ProcessRequestMsg.builder()
.requestDTO(requestDTO).build(),
Timeout.durationToTimeout(FiniteDuration.create(5, TimeUnit.SECONDS)));
ResponseDTO responseDTO = awaitResponse(futureResponse);
if (futureResponse.isCompleted()) {
return responseDTO;
} else {
return timeoutResponse(requestDTO);
}
}
ResponseDTO awaitResponse(Future futureResponse) throws Exception {
val resp = (ConsumerNotification) Await.result(futureResponse,
Duration.create(5, TimeUnit.SECONDS));
return resp.getResponseDTO();
}
}
.....
public class RequestProcessorActor extends UntypedActor {
public void onReceive(Object msg) throws Exception {
//do a lot of processing here, calling PersistenceActor, PaymentsActor,
MerchantActor, then reply
.....
replyToEndpointActor(responseDTO);
}
private void replyToEndpointActor(ResponseDTO responseDTO) {
ActorRef endpointActorRef =
responseDTO.getRequestDTO().getEndpointActorRef();
Patterns.pipe(Futures.successful(responseDTO), getContext().dispatcher())
.pipeTo(endpointActorRef,
responseDTO.getRequestDTO().getProcessActorRef());
log.info("Reply Sent to Actor : " + endpointActorRef);
}
I would want to know also if the RequestConsumer is just one actor or it
can have multiple instances to scale to multiple clients. Thanks for the
forthcoming help guys!
On Tuesday, 2 February 2016 03:28:00 UTC+2, Paul Cleary wrote:
>
> I agree with others, need to post some more code here.
>
> Also, how are you testing? I believe the TestKit and TestActorRef run
> single threaded.
>
> On Sunday, January 31, 2016 at 1:41:35 PM UTC-5, bangs wrote:
>>
>> I have an actor RequestConsumer (exposed over Camel netty http interface)
>> which consumes requests from clients. Since this actor has to block
>> inorder to reply to the client, I have implemented RequestConsumer to call
>> ProcessActor using Patterns.ask and wait for a future response.
>> ProcessActor will persist the request, send it to a PaymentActor
>> asynchonously and also to a MerchantActor asynchronously. After receiving
>> responses asynchronously from these child actors the ProcessActor pipes the
>> future response to RequestConsumer. However, RequestConsumer always
>> receives the response after the timeout - like its running on one thread
>> that can only be released after the timeout. What could I be missing?
>>
>>
>>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.