Enter code here...
I have undefined amount of akka-http client flows downloading data from an http service. I'm using akka-http host-level connection pooling because I would like to customise the pool, since there are long running requests going through it. Since, the number of clients is undefined and dynamic, I don't know how to configure the connection pool (max-open-requests/max-connections). Additionally, I might want the connection pool to be small (less than number of clients) to not damage the bandwidth. Thus, I would like to set up a client flow so that new connections and requests to the pool are backpressured: 1.Does this mean I will need to have a single materialised client flow? 2.How I materialise as many client flows as I want, such that if there are no available connections (demand from downstream) requests will be back pressured. My first attempt was Source.single pattern <https://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#examples>, however this method can exceed max-open-request and throw an exception as it creates a new flow instance each time a request is sent to a server. My second attempt was Source.Queue <https://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-in-a-streaming-fashion>, This method creates a single flow to which all requests are enqueued: however despite the documentaiton SourceQueue's OverflowStrategy backpressured does not work and when it exceeds max-connection or max-open-request, akka-http throws an exception <https://stackoverflow.com/questions/47731798/how-to-enable-source-queue-backpressure/47733277?noredirect=1#comment82480301_47733277> Can I accomplish backpressure using host-level streaming fashion <https://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-in-a-streaming-fashion> and have one client flow and add new requests using with MergeHub <https://doc.akka.io/docs/akka/2.5.8/stream/stream-dynamic.html?language=scala> ? This is my solution: private lazy val poolFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool] = Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port) val ServerSink = poolFlow.async.toMat(Sink.foreach({ case ((Success(resp), p)) => p.success(resp) case ((Failure(e), p)) => p.failure(e) }))(Keep.left) val runnableGraph: RunnableGraph[Sink[(HttpRequest, Promise[HttpResponse]), NotUsed]] = MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16).to(ServerSink) val toConsumer: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = runnableGraph.run() protected[akkahttp] def executeRequest[T](httpRequest: HttpRequest, unmarshal: HttpResponse => Future[T]): Future[T] = { val responsePromise = Promise[HttpResponse]() Source.single((httpRequest -> responsePromise)).runWith(toConsumer) responsePromise.future.flatMap(handleHttpResponse(_, unmarshal)) ) } -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
