Hi all!

I'm trying to use akka-streams and akka-http in order to solve the 
following problem:

 - we have 2 http clients (A and B) sending requests to 1 http server (C). 
Both sides use akka-http to communicate.
 - requests from A have higher priority compared to B, so C should first 
process requests from A, requests from B are of second priority
 - of course we would like to have back-pressure on each end enabled 

So the goal is to prioritize requests coming from many incoming connections.
Thinking logically that looks pretty straight forward and quite common 
case...
But I could not implement this so far.

I have the following code:
 
    val in1 = Http().bind(interface = "localhost", port = 8200)
    val in2 = Http().bind(interface = "localhost", port = 8201)

    val connSrc = Source.fromGraph(FlowGraph.create() { implicit b =>
      import FlowGraph.Implicits._

      val merge = b.add(Merge[IncomingConnection](2))

      in1 ~> print("in1") ~> merge.in(0)
      in2 ~> print("in2") ~> merge.in(1)

      SourceShape(merge.out)
    })

    val reqSrc : Source[(HttpRequest, IncomingConnection), _] =
      connSrc.flatMapConcat { conn =>
        Source.empty[HttpResponse]
          .via(conn.flow)
          .map(request => (request, conn))
      }

    val flow: Flow[(HttpRequest, IncomingConnection), (HttpResponse, 
IncomingConnection), _] =
      Flow[(HttpRequest, IncomingConnection)].map{
          case (HttpRequest(HttpMethods.GET, Uri.Path("/ping"), _, entity, 
_), conn: IncomingConnection) =>
            println(s"${System.currentTimeMillis()}: " +
              s"process request from 
${conn.remoteAddress.getHostName}:${conn.remoteAddress.getPort}")
            (HttpResponse(entity = "pong"), conn)
        }

    reqSrc.via(flow).to(Sink.foreach { case (resp, conn) =>
      Source.single(resp).via(conn.flow).runWith(Sink.ignore)
    }).run()

    def print(prefix: String) = Flow[IncomingConnection].map { s =>
      println(s"$prefix [ ${System.currentTimeMillis()} ]: 
${s.remoteAddress}"); s
    }

Basically first I try to get the source of http requests coming from 
different connections. 
Than I try to respond using the incoming connection which is bundled with 
response.

When making request from curl:

    % curl http://localhost:8200/ping
    curl: (52) Empty reply from server

this code produces the following output:

         in1 [ 1450287301512 ]: /127.0.0.1:52461
    1450287301626: process request from localhost:52461
    [INFO] [12/16/2015 20:35:01.641] 
[default-akka.actor.default-dispatcher-6] 
[akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200] 
Message [akka.io.Tcp$Unbound$] from 
Actor[akka://default/system/IO-TCP/selectors/$a/0#119537130] to 
Actor[akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200#-1438663077]
 
was not delivered. [1] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
    [INFO] [12/16/2015 20:35:01.641] 
[default-akka.actor.default-dispatcher-6] 
[akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201] 
Message [akka.io.Tcp$Unbound$] from 
Actor[akka://default/system/IO-TCP/selectors/$a/1#679898594] to 
Actor[akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201#1414174163]
 
was not delivered. [2] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.

Looks like something wrong with code.

Maybe there are some problems with using flow from IncomingConnection?

Thanks in advance for your help.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to