hi,
I have a simple HTTP service that accepts connections and keeps them alive.
Once client sends something I look-up actorRef (WORKER) based on path/query
of request and I would like to wait for response from such actor so I can
respond back to client.
this is server to server communication so there should be one connection
per machine but there will be situation where same path/query is sent by
more then one connection.
(in other words one worker should be able to update more than one stream
created by accepted connection - if this is even possible).
A worker can only produce result if other actor sent it data in first
place, worker cannot produce results by itself. I want a worker to
represent client in akka system.
The problem I am facing is that I cannot figure out how to create such flow
that gets HttpRequest and produces HttpResponse and in the middle sends
incoming request to actor and waits for response.
so far I came with such code
public void start() {
Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http
.get(system)
.bind(ConnectHttp.toHost("localhost", port), materializer);
Flow<IncomingConnection, IncomingConnection, NotUsed> failureDetection =
Flow
.of(IncomingConnection.class)
.watchTermination((_unused, termination) -> {
termination.whenComplete((done, cause) -> {
if (cause != null) {
log.error("Connection was closed.", cause);
}
});
return NotUsed.getInstance();
});
serverSource
.via(failureDetection)
// <--- send each new connection to actorRef
.to(Sink.actorRef(connectionRouter, closeConnectionMsg))
.run(materializer)
.whenCompleteAsync((binding, failure) -> {
if (failure != null) {
log.error("Could not initialize connection.", failure);
}
}, system.dispatcher());
}
// ConnectionRouter receive definition
receive(ReceiveBuilder
.match(IncomingConnection.class, connection -> {
Flow<HttpRequest, HttpResponse, NotUsed> updateRequestFlow = Flow
.of(HttpRequest.class)
.map(request -> {
String mime = request.getHeader(Accept.class)
.map(HttpHeader::value)
.orElse("application/json");
if (!isAcceptable(mime)) {
return HttpResponse.create()
.withStatus(StatusCodes.NOT_ACCEPTABLE)
.addHeader(RawHeader.create("Connection", "close"))
.addHeader(RawHeader.create("Content-Length", "0"));
}
Query query = request.getUri().query();
Optional<String> idOp = query.get("id");
if (!idOp.isPresent()) {
return HttpResponse.create()
.withStatus(StatusCodes.BAD_REQUEST)
.addHeader(RawHeader.create("Connection", "close"))
.addHeader(RawHeader.create("Content-Length", "0"));
}
String id = idOp.get();
// <--- retrieve or create new worker based on ID (it is
limited set of ids)
ActorRef worker = actorsMap.get(id);
// NOW worker.tell(READY_TO_GET_DATA_MSG) should eventually
create some result that should be mapped to response
byte[] bytes = toBytes(mime, RESULT_PRODUCED_BY_WORKER);
String length = Integer.toString(bytes.length);
return HttpResponse.create()
.withStatus(StatusCodes.OK)
.withEntity(HttpEntities.create(bytes))
.addHeader(RawHeader.create("Connection", "keep-alive"))
.addHeader(RawHeader.create("Content-Length", length))
.addHeader(RawHeader.create("Content-Type", mime));
});
connection.handleWith(updateRequestFlow, materializer);
})
.build());
is this even possible with current akka-http / streams ? I have been looking
into http://doc.akka.io/docs/akka/2.4.2/java/stream/stream-integrations.html
but mapAsync is rather not my use-case, ActorPublisher maybe would help but I
cannot make it fit into described flow.
thanks for any ideas.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.