Hi,

in akka-stream, processing is usually run in a fused fashion, i.e. without 
further configuration one stream will run in a single actor so all 
operations are run sequentially. In such a synchronous scenario, there's 
little room for elements to ever get dropped because the actorRef stage 
basically always needs to wait for the consumer stage to finish before it 
can do its own work. At that point the `foreach` stage already can process 
the next element. Fused processing also means that `Thread.sleep` is bad 
thing to do as it will block stream infrastructure and dispatcher threads 
from doing their work.

Try using `mapAsync()` with `akka.pattern.after` to wait (or actually do 
processing) without blocking infrastructure and it will probably start to 
work.

Johannes


On Thursday, January 25, 2018 at 8:03:11 AM UTC+1, sal...@thoughtworks.com 
wrote:
>
> Hello,
>
> We are having a requirement that if a consumer is slower than producer 
> then discard all the elements that cannot be consumed and whenever the 
> consumer gets ready, feed the latest element from producer.
>
> We tried an approach as follows:
>
> Source.actorRef(0, OverflowStrategy.dropHead)       // actor receives data 
>> at every 10 milliseconds
>
> .runWith {
>>    println("data received")
>>    Thread.sleep(1000)               // mimic consumer processing data in 
>> every 1 second
>> }
>
>
> We shrank the buffer size to 1 (minimal possible) with following settings
>
> private val actorMaterializerSettings = ActorMaterializerSettings(
>> actorSystem).withInputBuffer(1, 1)
>
>
> With this buffer size, Sink pulls data 1 to consume and data 2 to put in 
> buffer at initialization.
>
> While data 1 is getting processed we are dropping data from producer.
>
> When data 1 gets processed after 1000 milliseconds (1 second) ideally I 
> should receive data 10 (and drop 2 - 9 as consumer is slow) but instead I 
> receive data 2 from the buffer. data 2 in our domain is extremely useless 
> as it is stale.
>
> Is there a way to disable buffer at Sink totally and always pull latest 
> data from Source ?
>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to