I agree with your proposed solution. Maybe a Source that emits a "this is the last element" element, but something that the stage can implicitly handle or something.
On Tuesday, December 1, 2015 at 5:08:29 AM UTC-7, Jean-Pierre Thomasset wrote: > > Thank you for the message, > > I thought of that but as the stream approach worked really well when the > flow is uninterrupted, I tried to find a solution to the completion > problem. I think there are multiple case of feedback loop completion > deadlock and I was hoping there was a nice solution. > > One solution would be to have some kind of out of bound message passing > through the graph or some kind of tryClose/tryFinish operation that would > fail on the first stage still processing data. That would allow to send the > operation or message in the feedback loop and wait until it reaches back > the merge operation. This is something I tried to implement using an > Either[T, PoisonPill] flowing in the graph but it requires each stage to > handle this type which can be a pain. > > Regards, > Jean-Pierre. > > On Tuesday, December 1, 2015 at 12:23:53 PM UTC+1, drewhk wrote: >> >> Hi, >> >> In this case I don't see the reason of modeling the loop as a stream. By >> making it a stream you make all this state dispersed among concurrent >> entities, but in reality this is all state that needs to be at one place. >> If this is the case then this is clearly not a good fit for streams. You >> can make this complex processing part of a custom stage, or you can create >> an actor for this and tie to your other streaming parts. >> >> -Endre >> >> On Tue, Dec 1, 2015 at 12:14 PM, Jean-Pierre Thomasset < >> [email protected]> wrote: >> >>> Hi Endre, >>> >>> Thank you for your feedback. The problem is the termination condition >>> is dependent on the feedback loop being dried-out. Basically, if this stage >>> had access to all stages between it's output port and the feedback input >>> port, I could check that >>> >>> - The 'external' input port of the merge operation is CLOSED_EMPTY >>> - The 'feedback' input port of the merge operation is PULLED >>> - The output port of the merge operation is PULLED >>> - All input & output ports between the output port and the >>> 'feedback' input port of the merge operation are PULLED >>> >>> However there are at least two issues with this process : >>> >>> - The merge stage is not aware of the state of all the stages ports >>> (at least, I did not found how ;)) and even if it had access, it would >>> be >>> risky to access the state of another stage running in another actor. >>> - A stage pulling from its input port before pushing its current >>> payload (like a buffer or an asynchronous stage able to perform >>> concurrent >>> operation) would defeat the detection. >>> >>> If we don't care of the second issue, a solution would be to have a >>> probe at each edge of the graph, maybe using a custom connector instead of >>> '~>'. >>> >>> Jean-Pierre. >>> On Tuesday, December 1, 2015 at 9:29:01 AM UTC+1, drewhk wrote: >>>> >>>> Hi, >>>> >>>> In both cases, you have to know what is your terminating condition at >>>> the merge site. Then you can use a custom GraphStage to implement the >>>> merge >>>> and have the right termination logic: >>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M2/scala/stream-customize.html#custom-processing-with-graphstage >>>> >>>> I don't yet see how to provide such a built-in stage because the >>>> termination condition can be very different in different use-cases. Will >>>> think about it though, but it would be nice if you could contribute a >>>> GraphStage that at least solved your problems. >>>> >>>> -Endre >>>> >>>> On Tue, Dec 1, 2015 at 2:02 AM, David Knapp <[email protected]> >>>> wrote: >>>> >>>>> I'm actually in pretty much the exact same position you're in, except >>>>> instead of crawling for new URLs, I'm filtering back unfinished responses >>>>> into my stream so I can wait until they're done. This is my graph >>>>> >>>>> >>>>> source ~> sync ~> merge.preferred >>>>> merge ~> broadcast >>>>> broadcast.out(0) >>>>> ~> finishedFilter >>>>> broadcast.out(1) >>>>> ~> unfinishedFilter >>>>> merge <~ syncStatusFlow <~ Limiter.limit[Long](Client. >>>>> limiter, 10 seconds) <~ Flow[SyncStatus].map(_.account) <~ >>>>> unfinishedFilter >>>>> >>>>> >>>>> (finishedFilter.outlet) >>>>> >>>>> On Monday, May 18, 2015 at 3:29:06 AM UTC-7, Jan Liße wrote: >>>>>> >>>>>> Hello, >>>>>> >>>>>> i'm currently building a scraper system on top of Akka Streams. I >>>>>> have written a Flow that is able to follow paginated sites and scrape >>>>>> them >>>>>> in a loop. >>>>>> For this i use a feedback merge. >>>>>> >>>>>> My code: <https://gist.github.com/janlisse/f2672bf8bbee009ef009> >>>>>> >>>>>> <script src="https://gist.github.com/janlisse/f2672bf8bbee009ef009.js >>>>>> "></script> >>>>>> >>>>>> scrapePaginated takes a function that decides if there are further >>>>>> pages to scrape. If there are, it returns as part of the response tuple >>>>>> a >>>>>> Some() with the next url. >>>>>> And of course a None for the last page. >>>>>> The iteration and the feedback loop works and all pages are scraped >>>>>> properly. But even when all URL's are processed the stream never >>>>>> completes. >>>>>> OnComplete never gets invoked. >>>>>> Is this an expected behaviour? Or is there an error in my >>>>>> scrapePaginated method? I read the doc's chapter on graph deadlocks and >>>>>> liveness issues and finally added a buffer step with >>>>>> OverflowStrategy.Fail >>>>>> to the feedback loop but to no avail. >>>>>> If it helps to clarify the problem i can provide a simple Spec that >>>>>> reproduces the issue. >>>>>> >>>>>> Thanks in advance for any help! >>>>>> >>>>>> Jan >>>>>> >>>>>> >>>>>> -- >>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>> >>>>>>>>>> Check the FAQ: >>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>> >>>>>>>>>> Search the archives: >>>>> https://groups.google.com/group/akka-user >>>>> --- >>>>> You received this message because you are subscribed to the Google >>>>> Groups "Akka User List" group. >>>>> To unsubscribe from this group and stop receiving emails from it, send >>>>> an email to [email protected]. >>>>> To post to this group, send email to [email protected]. >>>>> Visit this group at http://groups.google.com/group/akka-user. >>>>> For more options, visit https://groups.google.com/d/optout. >>>>> >>>> >>>> -- >>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>> >>>>>>>>>> Check the FAQ: >>> http://doc.akka.io/docs/akka/current/additional/faq.html >>> >>>>>>>>>> Search the archives: >>> https://groups.google.com/group/akka-user >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Akka User List" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> To post to this group, send email to [email protected]. >>> Visit this group at http://groups.google.com/group/akka-user. >>> For more options, visit https://groups.google.com/d/optout. >>> >> >> -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
