For my problem, I finally found a solution but that may not work in all 
cases. The solution is possible because I know that an object flowing in 
the feedback loop cannot be multiplied (in my case there could be at max 
one 'next url' in the page currently fetched. So changing 
the extractNextUrl to output Option[T] instead of T, I can track at the 
merge site if there is another page to crawl, thus deciding if I need to 
finish this stage.

                                         broadcast ~> sink
source ~> merge           ~> fetchUrl ~> broadcast ~> extractNextUrl
          merge.preferred             <~              extractNextUrl

Here is what I want to do : 
- The extractNextUrl will send Some(t) when a next url is found, and None 
when there is none.
- The 'merge' stage will count outstanding data in the feedback loop. i.e, 
the difference between the output count and the merge.preferred input count 
(including None). 
- If the first input port of the merge operation is closed and upon 
reception of the last None from extractNextUrl and if there are no 
outstanding request then finish this stage.

I'll try to implement this solution in a custom merge stage this weekend 
and I'll let you know if it works.

Regards,
Jean-Pierre.


On Wednesday, December 2, 2015 at 8:58:38 PM UTC+1, David Knapp wrote:
>
> I agree with your proposed solution. Maybe a Source that emits a "this is 
> the last element" element, but something that the stage can implicitly 
> handle or something. 
>
> On Tuesday, December 1, 2015 at 5:08:29 AM UTC-7, Jean-Pierre Thomasset 
> wrote:
>>
>> Thank you for the message,
>>
>> I thought of that but as the stream approach worked really well when the 
>> flow is uninterrupted, I tried to find a solution to the completion 
>> problem. I think there are multiple case of feedback loop completion 
>> deadlock and I was hoping there was a nice solution.
>>
>> One solution would be to have some kind of out of bound message passing 
>> through the graph or some kind of tryClose/tryFinish operation that would 
>> fail on the first stage still processing data. That would allow to send the 
>> operation or message in the feedback loop and wait until it reaches back 
>> the merge operation. This is something I tried to implement using an 
>> Either[T, PoisonPill] flowing in the graph but it requires each stage to 
>> handle this type which can be a pain.
>>
>> Regards,
>> Jean-Pierre.
>>
>> On Tuesday, December 1, 2015 at 12:23:53 PM UTC+1, drewhk wrote:
>>>
>>> Hi,
>>>
>>> In this case I don't see the reason of modeling the loop as a stream. By 
>>> making it a stream you make all this state dispersed among concurrent 
>>> entities, but in reality this is all state that needs to be at one place. 
>>> If this is the case then this is clearly not a good fit for streams. You 
>>> can make this complex processing part of a custom stage, or you can create 
>>> an actor for this and tie to your other streaming parts. 
>>>
>>> -Endre
>>>
>>> On Tue, Dec 1, 2015 at 12:14 PM, Jean-Pierre Thomasset <
>>> [email protected]> wrote:
>>>
>>>> Hi Endre,
>>>>
>>>> Thank you for your feedback. The problem is the termination condition 
>>>> is dependent on the feedback loop being dried-out. Basically, if this 
>>>> stage 
>>>> had access to all stages between it's output port and the feedback input 
>>>> port, I could check that
>>>>
>>>>    - The 'external' input port of the merge operation is CLOSED_EMPTY
>>>>    - The 'feedback' input port of the merge operation is PULLED
>>>>    - The output port of the merge operation is PULLED
>>>>    - All input & output ports between the output port and the 
>>>>    'feedback' input port of the merge operation are PULLED
>>>>
>>>> However there are at least two issues with this process : 
>>>>
>>>>    - The merge stage is not aware of the state of all the stages ports 
>>>>    (at least, I did not found how ;)) and even if it had access, it would 
>>>> be 
>>>>    risky to access the state of another stage running in another actor.
>>>>    - A stage pulling from its input port before pushing its current 
>>>>    payload (like a buffer or an asynchronous stage able to perform 
>>>> concurrent 
>>>>    operation) would defeat the detection.
>>>>
>>>> If we don't care of the second issue, a solution would be to have a 
>>>> probe at each edge of the graph, maybe using a custom connector instead of 
>>>> '~>'.
>>>>
>>>> Jean-Pierre.
>>>> On Tuesday, December 1, 2015 at 9:29:01 AM UTC+1, drewhk wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> In both cases, you have to know what is your terminating condition at 
>>>>> the merge site. Then you can use a custom GraphStage to implement the 
>>>>> merge 
>>>>> and have the right termination logic: 
>>>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M2/scala/stream-customize.html#custom-processing-with-graphstage
>>>>>
>>>>> I don't yet see how to provide such a built-in stage because the 
>>>>> termination condition can be very different in different use-cases. Will 
>>>>> think about it though, but it would be nice if you could contribute a 
>>>>> GraphStage that at least solved your problems.
>>>>>
>>>>> -Endre
>>>>>
>>>>> On Tue, Dec 1, 2015 at 2:02 AM, David Knapp <[email protected]> 
>>>>> wrote:
>>>>>
>>>>>> I'm actually in pretty much the exact same position you're in, except 
>>>>>> instead of crawling for new URLs, I'm filtering back unfinished 
>>>>>> responses 
>>>>>> into my stream so I can wait until they're done. This is my graph
>>>>>>
>>>>>>
>>>>>> source ~> sync ~> merge.preferred
>>>>>>                                            merge ~> broadcast
>>>>>>                                                     broadcast.out(0) 
>>>>>> ~> finishedFilter
>>>>>>                                                     broadcast.out(1) 
>>>>>> ~> unfinishedFilter
>>>>>>           merge <~ syncStatusFlow <~ Limiter.limit[Long](Client.
>>>>>> limiter, 10 seconds) <~ Flow[SyncStatus].map(_.account) <~ 
>>>>>> unfinishedFilter
>>>>>>
>>>>>>
>>>>>> (finishedFilter.outlet)
>>>>>>
>>>>>> On Monday, May 18, 2015 at 3:29:06 AM UTC-7, Jan Liße wrote:
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> i'm currently building a scraper system on top of Akka Streams. I 
>>>>>>> have written a Flow that is able to follow paginated sites and scrape 
>>>>>>> them 
>>>>>>> in a loop.
>>>>>>> For this i use a feedback merge. 
>>>>>>>
>>>>>>> My code: <https://gist.github.com/janlisse/f2672bf8bbee009ef009>
>>>>>>>
>>>>>>> <script src="
>>>>>>> https://gist.github.com/janlisse/f2672bf8bbee009ef009.js";></script>
>>>>>>>
>>>>>>> scrapePaginated takes a function that decides if there are further 
>>>>>>> pages to scrape. If there are, it returns as part of the response tuple 
>>>>>>> a 
>>>>>>> Some() with the next url.
>>>>>>> And of course a None for the last page.
>>>>>>> The iteration and the feedback loop works and all pages are scraped 
>>>>>>> properly. But even when all URL's are processed the stream never 
>>>>>>> completes. 
>>>>>>> OnComplete never gets invoked.
>>>>>>> Is this an expected behaviour? Or is there an error in my 
>>>>>>> scrapePaginated method? I read the doc's chapter on graph deadlocks and 
>>>>>>> liveness issues and finally added a buffer step with 
>>>>>>> OverflowStrategy.Fail 
>>>>>>> to the feedback loop but to no avail.
>>>>>>> If it helps to clarify the problem i can provide a simple Spec that 
>>>>>>> reproduces the issue.
>>>>>>>
>>>>>>> Thanks in advance for any help!
>>>>>>>
>>>>>>> Jan  
>>>>>>>
>>>>>>>
>>>>>>> -- 
>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>> >>>>>>>>>> Check the FAQ: 
>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>> >>>>>>>>>> Search the archives: 
>>>>>> https://groups.google.com/group/akka-user
>>>>>> --- 
>>>>>> You received this message because you are subscribed to the Google 
>>>>>> Groups "Akka User List" group.
>>>>>> To unsubscribe from this group and stop receiving emails from it, 
>>>>>> send an email to [email protected].
>>>>>> To post to this group, send email to [email protected].
>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>
>>>>>
>>>>> -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: 
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: 
>>>> https://groups.google.com/group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to [email protected].
>>>> To post to this group, send email to [email protected].
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to