Thank you for your response. Indeed, when using http.singleRequest I can combine request and unmarshalling in a single stage and that will solve the problem.
Initially, though I've tried to use Http().cachedHostConnectionPool(...), which is why the code looked like above. Do I understand it correctly, that in this case cachedHostConnectionPool is basically unusable? I assume it will always be executed in a separate stage from the next stage, that actually consumes the request, and it can always happen that somewhere further down the stream something will finish or stop the stream because of an exception and cause the connections to leak? On Tuesday, August 22, 2017 at 12:03:18 PM UTC+2, [email protected] wrote: > > Hi Jerry, > > your explanation is spot-on. You need to be make sure that the entities of > all responses are consumed. In your case, that may not happen because of a > race-condition: `take(2)` will cancel the stream and responses might get > discarded between the first and the second `mapAsyncUnordered`. That this > will permanently stall the host connection pool is a bug ( > https://github.com/akka/akka-http/issues/1248). > > It might help if you combine the two mapAsyncUnordered stream elements > into one to make sure that every entity is consumed like this: > > source > .mapAsyncUnordered(paralellism)(x => > http.singleRequest(x).flatMap(response => > Unmarshal(response).to[PareResult] > ) > .take(2) > .to(Sink.ignore) > > Johannes > > On Monday, August 21, 2017 at 10:54:51 PM UTC+2, Jerry Tworek wrote: >> >> Hi, >> >> I'm constructing a pipeline where I send a series of requests to a host, >> parse them, process and save into a database and after some iteration it >> seems my program is getting stuck. >> I've tried to dive as deeply as I can into the cause, but there are >> enough moving parts that it's hard to pinpoint exactly what's happening. It >> seems that I'm exhausting the amount of available http connections to given >> host, and new requests are just stalled while the old ones for some reason >> don't get "drained". >> >> The code functions in the same way, whether I use connection pool or a >> single request. The easiest way to replicate the issue is to use .take(n) >> on the stream e.g. in the following way: >> >> source.mapAsyncUnordered(paralellism)(x => >> http.singleRequest(x)).mapAsyncUnordered(paralellism)(x => >> Unmarshal(x).to[PareResult]).take(2).to(Sink.ignore) >> >> My production case was slightly more complex, but after some time I >> managed to reduce it to the above. I don't know if my understanding is >> correct, but I guess, that more requests are requested from source than two >> and land in a buffer somewhere between stream stages. Then after 2 reach >> the stage .take(2), downstream finishes the stream which means the >> unmarshalling stage is finished and some requests stay in the "limbo" never >> reaching unmarshalling stage, thus never being drained and therefore >> blocking http requests. >> >> So my question is, is my interpretation above correct or is there another >> possible issue in play that I don't understand? If it is, is it my bug in >> the code of constructing the stream incorrectly, or is it a bug in Akka >> streams? >> >> Regards, >> Jerry >> > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
