Hi David. The pool by nature unordered, since it is a pool of active workers. Some requests might be faster, others slower. You can use single Http connetions though using the connection-level API: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M1/scala/http/client-side/connection-level.html
That has the drawback that it maps to a single connection and does not do reconnects. Alternatively, you can use mapAsync() with Request level API: the http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M1/scala/http/client-side/request-level.html That way you can launch multiple requests in parallel, but receive the responses in order as it uses the default SuperPool. The drawback of this approach is that it has more overhead than the approaches above, still, in your case this might might be the best approach. -Endre On Wed, Nov 18, 2015 at 1:59 AM, David Knapp <[email protected]> wrote: > I'm trying to use an Http pool inside a graph, and the fact that the > results of the pool can come out of order is really throwing me off. Is > there a way of forcing the pool to return ordered? Or is there some other > way of being able to do a graph without deadlocking myself? This is a small > example of what I'm trying to do. > > val poolClientFlow: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), > HostConnectionPool] > > val responseHandler: Flow[(Try[HttpResponse], Int), HttpResponse, Unit] > > val reqNumber = new AtomicInteger(1) > > val requestFlow = Flow[HttpRequest].map((_, > reqNumber.getAndIncrement)).via(poolClientFlow).via(responseHandler) > > > > val integratedHttpFlow = Flow() { implicit b => > import FlowGraph.Implicits._ > > val httpFlow: Flow[Long, Unit, Unit] = > Flow[Long].map(HttpRequest()).via(requestFlow).map(doSomeStuff) > > val zip = b.add(ZipWith[Long, Long, Long]((left:Unit, right: Long) => > right})) > val sourceBroadcaster = b.add(Broadcast[Long](2)) > val httpF = b.add(httpFlow) > > val doThisAfterHttpCompletes = b.add(Flow[Long].map(useLong)) > > > > sourceBroadcaster ~> httpF ~> zip.in0 > sourceBroadcaster ~> zip.in1 > zip.out ~> doThisAfterHttpCompletes > > (sourceBroadcaster.in, doThisAfterHttpCompletes.outlet) > > } > > > > > I hope that's clear enough. I simplified my code as much as possible. What > it boils down to I guess is that I have an object, and I want to turn it > into an http request and then when that http request is done, I want to do > something else on that original object. > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Akka Team Typesafe - Reactive apps on the JVM Blog: letitcrash.com Twitter: @akkateam -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
