Enter code here...

I have undefined amount of akka-http client flows downloading data from an 
http service. I'm using akka-http host-level connection pooling because I 
would like to customise the pool, since there are long running requests 
going through it.

Since, the number of clients is undefined and dynamic, I don't know how to 
configure the connection pool (max-open-requests/max-connections). 
Additionally, I might want the connection pool to be small (less than 
number of clients) to not damage the bandwidth.

Thus, I would like to set up a client flow so that new connections and 
requests to the pool are backpressured:

1.Does this mean I will need to have a single materialised client flow?

2.How I materialise as many client flows as I want, such that if there are 
no available connections (demand from downstream) requests will be back 
pressured.

My first attempt was Source.single pattern 
<https://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#examples>,
 
however this method can exceed max-open-request and throw an exception as 
it creates a new flow instance each time a request is sent to a server.

My second attempt was Source.Queue 
<https://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-in-a-streaming-fashion>,
 
This method creates a single flow to which all requests are enqueued: 
however despite the documentaiton SourceQueue's OverflowStrategy 
backpressured does not work and when it exceeds max-connection or 
max-open-request, akka-http throws an exception 
<https://stackoverflow.com/questions/47731798/how-to-enable-source-queue-backpressure/47733277?noredirect=1#comment82480301_47733277>

Can I accomplish backpressure using host-level streaming fashion 
<https://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-in-a-streaming-fashion>
 and 
have one client flow and add new requests using with MergeHub 
<https://doc.akka.io/docs/akka/2.5.8/stream/stream-dynamic.html?language=scala>
?

This is my solution:



  private lazy val poolFlow: Flow[(HttpRequest, Promise[HttpResponse]), 
(Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool] =

    Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port)


  val ServerSink =

    poolFlow.async.toMat(Sink.foreach({

      case ((Success(resp), p)) => p.success(resp)

      case ((Failure(e), p)) => p.failure(e)

    }))(Keep.left)



  val runnableGraph: RunnableGraph[Sink[(HttpRequest, 
Promise[HttpResponse]), NotUsed]] =

  MergeHub.source[(HttpRequest, 
Promise[HttpResponse])](perProducerBufferSize = 16).to(ServerSink)



  val toConsumer: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = 
runnableGraph.run()



  protected[akkahttp] def executeRequest[T](httpRequest: HttpRequest, 
unmarshal: HttpResponse => Future[T]): Future[T] = {

    val responsePromise = Promise[HttpResponse]()

    Source.single((httpRequest -> responsePromise)).runWith(toConsumer)

    responsePromise.future.flatMap(handleHttpResponse(_, unmarshal))

    )

  }

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to