On Thu, Mar 31, 2016 at 12:35 PM, <[email protected]> wrote:
> Thanks Endre, > > Without using the async though the stream locked up, which i had thought > was due to running it in one stream. > You should investigate that deadlock there. Maybe blocking calls? > Using a feedback loop would probably make it work better, just a bit > harder to implement than my current solution (I know actors somewhat better > than the streaming stuff). > You can implement it as a BidiStage, let's call it limiter, and then use it like this mySrc.via(limiter.join(myProcessingFlow.async)) This way you keep the complexity of the graph isolated in the BidiStage and can use the nice Flow API to compose it. > Also the stream is built from config rather than hard coded but if i know > where the two flows are i can connect them together. > > Robin > > On Thursday, March 31, 2016 at 11:10:46 AM UTC+1, drewhk wrote: >> >> Hi Robin, >> >> As a very first step I would recommend reading this section: >> http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-rate.html#Internal_buffers_and_their_effect >> >> >> The reason why an async boundary adds a buffer is because an async >> boundary makes no sense otherwise as without buffer both the downstream and >> upstream are forced to work in lockstep which is exactly as a synchronous >> model would, but without the overhead of async transfer. You can control >> the size of that buffer though. >> >> Alternatively, you can embed this in a larger graph, generating tokens >> yourself: >> >> yourSource -> zipWith -> doStuff -> [async boundary] -> doStuff -> bcast >> -> out >> zipWith <- concat <- [async boundary] <- bcast // feedback >> loop of token >> concat <- Source.single(Token) // Initial token is needed >> otherwise there is deadlock >> >> The above is a pseudocode that demonstrates how to use a graph with >> feedback cycle to limit the number of elements in a section that is >> independent of any intermediate buffering along the path. >> >> -Endre >> >> >> On Thu, Mar 31, 2016 at 12:00 PM, <[email protected]> wrote: >> >>> I'm working on building an application that uses Akka Streams to process >>> data.. >>> >>> *tl;dr* >>> >>> When i use .async() on a flow i notice the element to the left of it >>> (normally source in my case) is pulled numerous time, i'd like to prevent >>> this behaviour and only pull when the stream itself asks for it to pull. >>> >>> >>> *Details* >>> >>> >>> One of the element i'm working on is the idea of 'limiting' the number >>> of concurrent 'data chunks' in the stream. In this case we're processing >>> satellite data and each flow in the stream is processor that manipulates >>> the data. >>> >>> The input data is fairly large, but each processor then adds additional >>> data to it, i've so far made the stream iterator on one record at a time >>> but i would like the ability to only allow say 20 rows of data in the >>> stream at any one time. >>> >>> I've written two flows the use an actor to keep track on the number of >>> rows in the stream, when the first actor gets an onPush it asks the actor >>> if there is room for it? if so it pushes it, if not it waits for there to >>> be room. The second flow sits at the end and notifies the actor when a row >>> leaves the stream (well between the two flows). >>> >>> However i hit an issue where without using async on the first flow the >>> stream locks up (unsurprisingly), but when i add async to that flow i >>> notice that it looks like it starts buffering. My source's pull is called >>> multiple times right at the start and i see other 'strange' behaviour >>> because of this. Even if for instance i use .buffer to set the buffer to 1 >>> (can't be 0) i still get issues. >>> >>> Any suggestions on why i see this or if i've gone around this whole >>> limiting idea the completely wrong way :) >>> >>> Also i'm doing this in Java.. >>> >>> -- >>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>> >>>>>>>>>> Check the FAQ: >>> http://doc.akka.io/docs/akka/current/additional/faq.html >>> >>>>>>>>>> Search the archives: >>> https://groups.google.com/group/akka-user >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Akka User List" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> To post to this group, send email to [email protected]. >>> Visit this group at https://groups.google.com/group/akka-user. >>> For more options, visit https://groups.google.com/d/optout. >>> >> >> -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
