Hi Jerry,

your explanation is spot-on. You need to be make sure that the entities of 
all responses are consumed. In your case, that may not happen because of a 
race-condition: `take(2)` will cancel the stream and responses might get 
discarded between the first and the second `mapAsyncUnordered`. That this 
will permanently stall the host connection pool is a bug 
(https://github.com/akka/akka-http/issues/1248).

It might help if you combine the two mapAsyncUnordered stream elements into 
one to make sure that every entity is consumed like this:

source
  .mapAsyncUnordered(paralellism)(x => 
     http.singleRequest(x).flatMap(response => 
Unmarshal(response).to[PareResult]
  )
  .take(2)
  .to(Sink.ignore)

Johannes

On Monday, August 21, 2017 at 10:54:51 PM UTC+2, Jerry Tworek wrote:
>
> Hi,
>
> I'm constructing a pipeline where I send a series of requests to a host, 
> parse them, process and save into a database and after some iteration it 
> seems my program is getting stuck. 
> I've tried to dive as deeply as I can into the cause, but there are enough 
> moving parts that it's hard to pinpoint exactly what's happening. It seems 
> that I'm exhausting the amount of available http connections to given host, 
> and new requests are just stalled while the old ones for some reason don't 
> get "drained".
>
> The code functions in the same way, whether I use connection pool or a 
> single request. The easiest way to replicate the issue is to use .take(n) 
> on the stream e.g. in the following way:
>
> source.mapAsyncUnordered(paralellism)(x => 
> http.singleRequest(x)).mapAsyncUnordered(paralellism)(x => 
> Unmarshal(x).to[PareResult]).take(2).to(Sink.ignore)
>
> My production case was slightly more complex, but after some time I 
> managed to reduce it to the above. I don't know if my understanding is 
> correct, but I guess, that more requests are requested from source than two 
> and land in a buffer somewhere between stream stages. Then after 2 reach 
> the stage .take(2), downstream finishes the stream which means the 
> unmarshalling stage is finished and some requests stay in the "limbo" never 
> reaching unmarshalling stage, thus never being drained and therefore 
> blocking http requests. 
>
> So my question is, is my interpretation above correct or is there another 
> possible issue in play that I don't understand? If it is, is it my bug in 
> the code of constructing the stream incorrectly, or is it a bug in Akka 
> streams? 
>
> Regards,
> Jerry
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to