Hi all!
I'm trying to use akka-streams and akka-http in order to solve the
following problem:
- we have 2 http clients (A and B) sending requests to 1 http server (C).
Both sides use akka-http to communicate.
- requests from A have higher priority compared to B, so C should first
process requests from A, requests from B are of second priority
- of course we would like to have back-pressure on each end enabled
So the goal is to prioritize requests coming from many incoming connections.
Thinking logically that looks pretty straight forward and quite common
case...
But I could not implement this so far.
I have the following code:
val in1 = Http().bind(interface = "localhost", port = 8200)
val in2 = Http().bind(interface = "localhost", port = 8201)
val connSrc = Source.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
val merge = b.add(Merge[IncomingConnection](2))
in1 ~> print("in1") ~> merge.in(0)
in2 ~> print("in2") ~> merge.in(1)
SourceShape(merge.out)
})
val reqSrc : Source[(HttpRequest, IncomingConnection), _] =
connSrc.flatMapConcat { conn =>
Source.empty[HttpResponse]
.via(conn.flow)
.map(request => (request, conn))
}
val flow: Flow[(HttpRequest, IncomingConnection), (HttpResponse,
IncomingConnection), _] =
Flow[(HttpRequest, IncomingConnection)].map{
case (HttpRequest(HttpMethods.GET, Uri.Path("/ping"), _, entity,
_), conn: IncomingConnection) =>
println(s"${System.currentTimeMillis()}: " +
s"process request from
${conn.remoteAddress.getHostName}:${conn.remoteAddress.getPort}")
(HttpResponse(entity = "pong"), conn)
}
reqSrc.via(flow).to(Sink.foreach { case (resp, conn) =>
Source.single(resp).via(conn.flow).runWith(Sink.ignore)
}).run()
def print(prefix: String) = Flow[IncomingConnection].map { s =>
println(s"$prefix [ ${System.currentTimeMillis()} ]:
${s.remoteAddress}"); s
}
Basically first I try to get the source of http requests coming from
different connections.
Than I try to respond using the incoming connection which is bundled with
response.
When making request from curl:
% curl http://localhost:8200/ping
curl: (52) Empty reply from server
this code produces the following output:
in1 [ 1450287301512 ]: /127.0.0.1:52461
1450287301626: process request from localhost:52461
[INFO] [12/16/2015 20:35:01.641]
[default-akka.actor.default-dispatcher-6]
[akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200]
Message [akka.io.Tcp$Unbound$] from
Actor[akka://default/system/IO-TCP/selectors/$a/0#119537130] to
Actor[akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200#-1438663077]
was not delivered. [1] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
[INFO] [12/16/2015 20:35:01.641]
[default-akka.actor.default-dispatcher-6]
[akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201]
Message [akka.io.Tcp$Unbound$] from
Actor[akka://default/system/IO-TCP/selectors/$a/1#679898594] to
Actor[akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201#1414174163]
was not delivered. [2] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
Looks like something wrong with code.
Maybe there are some problems with using flow from IncomingConnection?
Thanks in advance for your help.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.