Did you make any progress on this? I have a flow that has process incoming 
messages from a SQS queue and a REST endpoint; the message format and 
processing is the same for both sources so my plan is to create a actor 
publisher that will receive messages from the REST layer and an infinite 
source that will pull messages from SQS. Messages coming from the REST 
endpoint will have an optional "continuation" that will be used to complete 
the HTTP request (the continuation function is created using the 
completeWith directive).

https://github.com/lvicentesanchez/random/blob/master/src/main/scala/io/github/lvicentesanchez/Boot.scala#L41

My current publisher is unbounded,

https://github.com/lvicentesanchez/random/blob/master/src/main/scala/io/github/lvicentesanchez/streams/source/UnboundedPublisher.scala

so there is a potential memory issue there on top of the DDoS caused by 
opening to many connections.

I could implement the connection backpressure by creating a 
BoundedPublisher that can hold a maximum number of elements in a buffer. If 
we receive a request from the REST layer and the buffer is full, my 
BoundedPublisher will use the continuation to complete the request with a 
503 Server busy. The problem with this approach is that I will have to 
change the continuation to be able to handle the backpressure case and my 
response objects would have to carry low level information, if I want to be 
able to create a generic BoundPublisher source.

An alternative approach would be to create a "buffering" FlexiRoute with 2 
outputs connected to Source.connection. That flexiroute would buffer up-to 
N requests; if a new request arrives, when the buffer is full, I would send 
it to the "backpressure" output that would end in a sink that would will 
complete the connection. If the buffer is not full it will be routed it to 
the "routing" output, that will handle it using an akka-http routing Flow 
(created with Route.asyncHandler)

I'm leaning towards the second approach, but I don't know if it's feasible 
or not.

El miércoles, 28 de enero de 2015, 9:20:33 (UTC), Jean Rossier escribió:
>
> Hi Roland,
>
> thanks for the reply.
> Do you know if this feature will be available in the 1.0 release ? if yes, 
> when should it be available ? 
>
> Regarding the "awkward" implementation, and following your advise, I'm 
> trying to do something like the following:
> val serverBinding = Http(system).bind(interface = host, port = port)
> serverBinding.connections.buffer(5, 
> OverflowStrategy.dropTail).map(incomingConnectionToHttpRequest).mapAsync[HttpResponse](httpRequestToHttpResponseFuture).to(BlackholeSink).run()
>
> but I can't figure out how to get an HttpRequest out of an 
> IncomingConnection. Is there any helper available in Akka HTTP ?
>
> Thanks,
> Jean
>
> On Monday, January 26, 2015 at 10:35:59 AM UTC+1, rkuhn wrote:
>>
>> Hi Jean,
>>
>> calling “connection handleWith myFlow” is a really quick operation, 
>> hence you see no back-pressure kicking in. The feature you are after is 
>> currently rather awkward to implement, you’d have to replace handleWith with 
>> manually joining and running the connection’s Flow in order to obtain a 
>> Future for its termination, and then you’d mapAsync into a BlackholeSink 
>> instead of using a ForeachSink.
>>
>> Since this is a common thing people will want to do, we will provide this 
>> (or an equivalent solution) with Akka HTTP; this is one of the joys of 
>> using development previews :-) But thanks for reporting in any case!
>>
>> Regards,
>>
>> Roland
>>
>> 26 jan 2015 kl. 10:04 skrev Jean Rossier <[email protected]>:
>>
>> I would have expected some failed requests.
>> In my test, I'm sending 200 concurrent requests (the processing of each 
>> request takes circa 200ms), Since I have a buffer of 5, I thought that some 
>> requests would have been dropped and my client would have received some 
>> errors. But, in my test, all requests are replied sucessfully.
>> I thought OverflowStrategy.dropTail would drop the incoming requests when 
>> the buffer is full.
>>
>> Jean
>>
>> On Monday, January 26, 2015 at 9:38:36 AM UTC+1, drewhk wrote:
>>>
>>> Hi Jean,
>>>
>>> On Mon, Jan 26, 2015 at 9:03 AM, Jean Rossier <[email protected]> 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm wondering if there is a way to use back-pressure mechanism on 
>>>> incoming connections with Akka HTTP ? 
>>>> The documentation mentions clearly that there is no back-pressure 
>>>> applied to the connections Source if we use the startHandlingWith* 
>>>> functions (
>>>> http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M2/?_ga=1.267496717.2041479737.1421658563#akka.http.Http$$ServerBinding).
>>>>  
>>>> Therefore, I tried to do something like the following:
>>>> val serverBinding = Http(system).bind(interface = "localhost", port = 
>>>> port)
>>>> serverBinding.connections.buffer(5, 
>>>> OverflowStrategy.dropTail).to(ForeachSink(_ handleWith myFlow)).run()
>>>>
>>>> But, when sending a bunch of concurrent connections to my server, the 
>>>> buffer in the above code doesn't seem to have any effect.
>>>>
>>>
>>> What do yo mean it had no effect? What did you expect it do do and what 
>>> happened? What do you think OverflowStrategy.dropTail will do here?
>>>
>>> -Endre
>>>  
>>>
>>>> Am I doing anything wrong ?
>>>>
>>>> Cheers,
>>>> Jean
>>>>
>>>> -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: 
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: 
>>>> https://groups.google.com/group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to [email protected].
>>>> To post to this group, send email to [email protected].
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected].
>> To post to this group, send email to [email protected].
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>
>>
>>
>> *Dr. Roland Kuhn*
>> *Akka Tech Lead*
>> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
>> twitter: @rolandkuhn
>> <http://twitter.com/#!/rolandkuhn>
>>  
>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to