Hi Endre,

Thank you for your feedback. The problem is the termination condition 
is dependent on the feedback loop being dried-out. Basically, if this stage 
had access to all stages between it's output port and the feedback input 
port, I could check that

   - The 'external' input port of the merge operation is CLOSED_EMPTY
   - The 'feedback' input port of the merge operation is PULLED
   - The output port of the merge operation is PULLED
   - All input & output ports between the output port and the 'feedback' 
   input port of the merge operation are PULLED

However there are at least two issues with this process : 

   - The merge stage is not aware of the state of all the stages ports (at 
   least, I did not found how ;)) and even if it had access, it would be risky 
   to access the state of another stage running in another actor.
   - A stage pulling from its input port before pushing its current payload 
   (like a buffer or an asynchronous stage able to perform concurrent 
   operation) would defeat the detection.

If we don't care of the second issue, a solution would be to have a probe 
at each edge of the graph, maybe using a custom connector instead of '~>'.

Jean-Pierre.
On Tuesday, December 1, 2015 at 9:29:01 AM UTC+1, drewhk wrote:
>
> Hi,
>
> In both cases, you have to know what is your terminating condition at the 
> merge site. Then you can use a custom GraphStage to implement the merge and 
> have the right termination logic: 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M2/scala/stream-customize.html#custom-processing-with-graphstage
>
> I don't yet see how to provide such a built-in stage because the 
> termination condition can be very different in different use-cases. Will 
> think about it though, but it would be nice if you could contribute a 
> GraphStage that at least solved your problems.
>
> -Endre
>
> On Tue, Dec 1, 2015 at 2:02 AM, David Knapp <[email protected] 
> <javascript:>> wrote:
>
>> I'm actually in pretty much the exact same position you're in, except 
>> instead of crawling for new URLs, I'm filtering back unfinished responses 
>> into my stream so I can wait until they're done. This is my graph
>>
>>
>> source ~> sync ~> merge.preferred
>>                                            merge ~> broadcast
>>                                                     broadcast.out(0) ~> 
>> finishedFilter
>>                                                     broadcast.out(1) ~> 
>> unfinishedFilter
>>           merge <~ syncStatusFlow <~ Limiter.limit[Long](Client.limiter, 
>> 10 seconds) <~ Flow[SyncStatus].map(_.account) <~ unfinishedFilter
>>
>>
>> (finishedFilter.outlet)
>>
>> On Monday, May 18, 2015 at 3:29:06 AM UTC-7, Jan Liße wrote:
>>>
>>> Hello,
>>>
>>> i'm currently building a scraper system on top of Akka Streams. I have 
>>> written a Flow that is able to follow paginated sites and scrape them in a 
>>> loop.
>>> For this i use a feedback merge. 
>>>
>>> My code: <https://gist.github.com/janlisse/f2672bf8bbee009ef009>
>>>
>>> <script src="https://gist.github.com/janlisse/f2672bf8bbee009ef009.js
>>> "></script>
>>>
>>> scrapePaginated takes a function that decides if there are further pages 
>>> to scrape. If there are, it returns as part of the response tuple a Some() 
>>> with the next url.
>>> And of course a None for the last page.
>>> The iteration and the feedback loop works and all pages are scraped 
>>> properly. But even when all URL's are processed the stream never completes. 
>>> OnComplete never gets invoked.
>>> Is this an expected behaviour? Or is there an error in my 
>>> scrapePaginated method? I read the doc's chapter on graph deadlocks and 
>>> liveness issues and finally added a buffer step with OverflowStrategy.Fail 
>>> to the feedback loop but to no avail.
>>> If it helps to clarify the problem i can provide a simple Spec that 
>>> reproduces the issue.
>>>
>>> Thanks in advance for any help!
>>>
>>> Jan  
>>>
>>>
>>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected] 
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to