thanks for response. well ask pattern is a way to go but I thought I could avoid it and use only flow's connection.
On Monday, 7 March 2016 01:02:08 UTC+1, Rafał Krzewski wrote: > > Hej Paweł, > > if I understand your use case correctly, mapAsync with an ask [1] inside > should work just fine. You might want to introduce an coordinator actor > that would deal with worker management, ie the HTTP flow sends message > (using ask pattern) to coordinator, coordinator performs worker lookup / > creation and forwards the message to worker, worker replies directly to the > temporary actor handing the ask, thus completing the Future. In the next > stage, just marshall the data to a HttpEntity and your're done. > > Cheers, > Rafał > > [1] > http://doc.akka.io/japi/akka/2.4.2/akka/pattern/AskSupport.html#ask-akka.actor.ActorRef-java.lang.Object-akka.util.Timeout- > > > W dniu niedziela, 6 marca 2016 23:25:07 UTC+1 użytkownik paweł kamiński > napisał: >> >> hi, >> I have a simple HTTP service that accepts connections and keeps them >> alive. >> Once client sends something I look-up actorRef (WORKER) based on >> path/query of request and I would like to wait for response from such actor >> so I can respond back to client. >> >> this is server to server communication so there should be one connection >> per machine but there will be situation where same path/query is sent by >> more then one connection. >> (in other words one worker should be able to update more than one stream >> created by accepted connection - if this is even possible). >> A worker can only produce result if other actor sent it data in first >> place, worker cannot produce results by itself. I want a worker to >> represent client in akka system. >> >> The problem I am facing is that I cannot figure out how to create such >> flow that gets HttpRequest and produces HttpResponse and in the middle >> sends incoming request to actor and waits for response. >> so far I came with such code >> >> public void start() { >> Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource >> = Http >> .get(system) >> .bind(ConnectHttp.toHost("localhost", port), materializer); >> >> Flow<IncomingConnection, IncomingConnection, NotUsed> failureDetection = >> Flow >> .of(IncomingConnection.class) >> .watchTermination((_unused, termination) -> { >> termination.whenComplete((done, cause) -> { >> if (cause != null) { >> log.error("Connection was closed.", cause); >> } >> }); >> return NotUsed.getInstance(); >> }); >> >> serverSource >> .via(failureDetection) >> >> // <--- send each new connection to actorRef >> .to(Sink.actorRef(connectionRouter, closeConnectionMsg)) >> .run(materializer) >> .whenCompleteAsync((binding, failure) -> { >> if (failure != null) { >> log.error("Could not initialize connection.", failure); >> } >> }, system.dispatcher()); >> } >> >> >> // ConnectionRouter receive definition >> >> receive(ReceiveBuilder >> .match(IncomingConnection.class, connection -> { >> Flow<HttpRequest, HttpResponse, NotUsed> updateRequestFlow = Flow >> >> .of(HttpRequest.class) >> >> .map(request -> { >> String mime = request.getHeader(Accept.class) >> >> .map(HttpHeader::value) >> >> .orElse("application/json"); >> >> >> if (!isAcceptable(mime)) { >> >> return HttpResponse.create() >> >> .withStatus(StatusCodes.NOT_ACCEPTABLE) >> >> .addHeader(RawHeader.create("Connection", >> "close")) >> >> .addHeader(RawHeader.create("Content-Length", >> "0")); >> >> } >> >> >> Query query = request.getUri().query(); >> >> Optional<String> idOp = query.get("id"); >> >> if (!idOp.isPresent()) { >> >> return HttpResponse.create() >> >> .withStatus(StatusCodes.BAD_REQUEST) >> >> .addHeader(RawHeader.create("Connection", >> "close")) >> >> .addHeader(RawHeader.create("Content-Length", >> "0")); >> >> } >> >> >> String id = idOp.get(); >> >> // <--- retrieve or create new worker based on ID (it is >> limited set of ids) >> >> ActorRef worker = actorsMap.get(id); >> >> >> // NOW worker.tell(READY_TO_GET_DATA_MSG) should eventually >> create some result that should be mapped to response >> >> >> byte[] bytes = toBytes(mime, RESULT_PRODUCED_BY_WORKER); >> >> String length = Integer.toString(bytes.length); >> >> return HttpResponse.create() >> >> .withStatus(StatusCodes.OK) >> >> .withEntity(HttpEntities.create(bytes)) >> >> .addHeader(RawHeader.create("Connection", >> "keep-alive")) >> >> .addHeader(RawHeader.create("Content-Length", >> length)) >> >> .addHeader(RawHeader.create("Content-Type", mime)); >> >> }); >> >> connection.handleWith(updateRequestFlow, materializer); >> }) >> .build()); >> >> >> is this even possible with current akka-http / streams ? I have been looking >> into http://doc.akka.io/docs/akka/2.4.2/java/stream/stream-integrations.html >> but mapAsync is rather not my use-case, ActorPublisher maybe would help but >> I cannot make it fit into described flow. >> >> thanks for any ideas. >> >> >> >> -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
