I think your emitFeedback() method should look like this:
def emitFeedback(): Unit = {
grab(feedback) match {
case Some(e) => {
feedbackEmitting += 1
flowingCount += 1
emit(out, e, emitted)
}
case None => checkComplete()
}
tryPull(feedback)
}
otherwise it will keep accumulating feedbackEmitting and lock the upstream
as soon as it gets its first None in the feedback.
On Monday, December 7, 2015 at 7:06:19 AM UTC-7, Jean-Pierre Thomasset
wrote:
>
> Hello,
>
> Here is the implementation of a custom feedback stage. It's composed of
> three port :
> - out [T]: The output of the merge operation made by this stage
> - in [T] : The main input port (outside the feedback loop)
> - feedback [Option[T]] : the feedback loop input port
>
> Keep in mind that it can only works if the number of element flowing out
> is exactly the number flowing in back in the feedback port. The stage will
> be completed once the main input port (in) is completed and that there are
> no outstanding request flowing in the feedback loop (the last element in
> this loop should be a None to complete the stage).
>
> class FeedbackShape[T](val _init: FanInShape.Init[T]) extends
> FanInShape[T](_init) {
> def this() = this(FanInShape.Name[T]("FeedbackShape"))
>
> override protected def construct(init: Init[T]): FanInShape[T] = new
> FeedbackShape(init)
> override def deepCopy(): FeedbackShape[T] =
> super.deepCopy().asInstanceOf[FeedbackShape[T]]
>
>
> val in = newInlet[T]("in")
> val feedback = newInlet[Option[T]]("feedback")
> }
>
> class Feedback[T] extends GraphStage[FeedbackShape[T]] {
> // Define the shape of this stage, which is SourceShape with the port
> we defined above
> override val shape: FeedbackShape[T] = new FeedbackShape
>
>
> def in: Inlet[T] = shape.in
> def out: Outlet[T] = shape.out
> def feedback: Inlet[Option[T]] = shape.feedback
>
>
> // This is where the actual (possibly stateful) logic will live
> override def createLogic(inheritedAttributes: Attributes):
> GraphStageLogic = new GraphStageLogic(shape) {
> var flowingCount = 0
>
> def checkComplete(): Unit = {
> // Complete this stage if there are no data in the feedback loop
> if(flowingCount == 0 && isClosed(in)) {
> completeStage()
> }
> }
>
> def pullSecondary(): Unit = tryPull(in)
>
> /**
> * Output port handler
> */
> setHandler(out, new OutHandler {
> private var first = true
> override def onPull(): Unit = {
> if (first) {
> first = false
> tryPull(feedback)
> tryPull(in)
> }
> }
> })
>
> var feedbackEmitting = 0
>
> /**
> * Prefered input port handler
> */
> setHandler(feedback, new InHandler {
> override def onUpstreamFinish(): Unit = checkComplete()
> override def onPush(): Unit = {
> /* Received back the element in the feedback loop */
> flowingCount -= 1
> emitFeedback()
> }
>
> def emitFeedback(): Unit = {
> feedbackEmitting += 1
>
> grab(feedback) match {
> case Some(e) => {
> flowingCount += 1
> emit(out, e, emitted)
> }
> case None => checkComplete()
> }
> tryPull(feedback)
> }
>
> val emitted = () ⇒ {
> feedbackEmitting -= 1
> if (isAvailable(feedback)) emitFeedback()
> else if (feedbackEmitting == 0) emitSecondary()
> }
>
> def emitSecondary(): Unit = {
> if (isAvailable(in)) {
> flowingCount += 1
> emit(out, grab(in), pullSecondary)
> }
> }
>
> })
>
> /**
> * Secondary input port handler
> */
> setHandler(in, new InHandler {
> override def onPush(): Unit = {
> if (feedbackEmitting == 0) {
> flowingCount += 1
> emit(out, grab(in), pullSecondary)
> }
> }
> override def onUpstreamFinish(): Unit = checkComplete()
> })
>
>
> }
> }
>
> It can be used in a graph of this shape :
> source ~> feedback.in
> feedback.out ~> broadcast ~> output/sink
> feedback.feedback <~ loopOperation <~ broadcast
>
> where loopOperation must check if it needs to send back the data to the
> loop by sending either
> - Some(t) : if the loop shall continue
> - None : if the loop must end for the given input
>
> The feedback stage will be responsible of unboxing the Option[T] sent to
> it's feedback port and will send it as a T to the out port.
>
> I hope this will help you.
>
> Best Regards,
> JP.
>
>
> On Thursday, December 3, 2015 at 12:10:27 PM UTC+1, Jean-Pierre Thomasset
> wrote:
>>
>> For my problem, I finally found a solution but that may not work in all
>> cases. The solution is possible because I know that an object flowing in
>> the feedback loop cannot be multiplied (in my case there could be at max
>> one 'next url' in the page currently fetched. So changing
>> the extractNextUrl to output Option[T] instead of T, I can track at the
>> merge site if there is another page to crawl, thus deciding if I need to
>> finish this stage.
>>
>> broadcast ~> sink
>> source ~> merge ~> fetchUrl ~> broadcast ~> extractNextUrl
>> merge.preferred <~ extractNextUrl
>>
>> Here is what I want to do :
>> - The extractNextUrl will send Some(t) when a next url is found, and None
>> when there is none.
>> - The 'merge' stage will count outstanding data in the feedback loop.
>> i.e, the difference between the output count and the merge.preferred input
>> count (including None).
>> - If the first input port of the merge operation is closed and upon
>> reception of the last None from extractNextUrl and if there are no
>> outstanding request then finish this stage.
>>
>> I'll try to implement this solution in a custom merge stage this weekend
>> and I'll let you know if it works.
>>
>> Regards,
>> Jean-Pierre.
>>
>>
>> On Wednesday, December 2, 2015 at 8:58:38 PM UTC+1, David Knapp wrote:
>>>
>>> I agree with your proposed solution. Maybe a Source that emits a "this
>>> is the last element" element, but something that the stage can implicitly
>>> handle or something.
>>>
>>> On Tuesday, December 1, 2015 at 5:08:29 AM UTC-7, Jean-Pierre Thomasset
>>> wrote:
>>>>
>>>> Thank you for the message,
>>>>
>>>> I thought of that but as the stream approach worked really well when
>>>> the flow is uninterrupted, I tried to find a solution to the completion
>>>> problem. I think there are multiple case of feedback loop completion
>>>> deadlock and I was hoping there was a nice solution.
>>>>
>>>> One solution would be to have some kind of out of bound message passing
>>>> through the graph or some kind of tryClose/tryFinish operation that would
>>>> fail on the first stage still processing data. That would allow to send
>>>> the
>>>> operation or message in the feedback loop and wait until it reaches back
>>>> the merge operation. This is something I tried to implement using an
>>>> Either[T, PoisonPill] flowing in the graph but it requires each stage to
>>>> handle this type which can be a pain.
>>>>
>>>> Regards,
>>>> Jean-Pierre.
>>>>
>>>> On Tuesday, December 1, 2015 at 12:23:53 PM UTC+1, drewhk wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> In this case I don't see the reason of modeling the loop as a stream.
>>>>> By making it a stream you make all this state dispersed among concurrent
>>>>> entities, but in reality this is all state that needs to be at one place.
>>>>> If this is the case then this is clearly not a good fit for streams. You
>>>>> can make this complex processing part of a custom stage, or you can
>>>>> create
>>>>> an actor for this and tie to your other streaming parts.
>>>>>
>>>>> -Endre
>>>>>
>>>>> On Tue, Dec 1, 2015 at 12:14 PM, Jean-Pierre Thomasset <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi Endre,
>>>>>>
>>>>>> Thank you for your feedback. The problem is the termination condition
>>>>>> is dependent on the feedback loop being dried-out. Basically, if this
>>>>>> stage
>>>>>> had access to all stages between it's output port and the feedback input
>>>>>> port, I could check that
>>>>>>
>>>>>> - The 'external' input port of the merge operation is CLOSED_EMPTY
>>>>>> - The 'feedback' input port of the merge operation is PULLED
>>>>>> - The output port of the merge operation is PULLED
>>>>>> - All input & output ports between the output port and the
>>>>>> 'feedback' input port of the merge operation are PULLED
>>>>>>
>>>>>> However there are at least two issues with this process :
>>>>>>
>>>>>> - The merge stage is not aware of the state of all the stages
>>>>>> ports (at least, I did not found how ;)) and even if it had access,
>>>>>> it
>>>>>> would be risky to access the state of another stage running in
>>>>>> another
>>>>>> actor.
>>>>>> - A stage pulling from its input port before pushing its current
>>>>>> payload (like a buffer or an asynchronous stage able to perform
>>>>>> concurrent
>>>>>> operation) would defeat the detection.
>>>>>>
>>>>>> If we don't care of the second issue, a solution would be to have a
>>>>>> probe at each edge of the graph, maybe using a custom connector instead
>>>>>> of
>>>>>> '~>'.
>>>>>>
>>>>>> Jean-Pierre.
>>>>>> On Tuesday, December 1, 2015 at 9:29:01 AM UTC+1, drewhk wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> In both cases, you have to know what is your terminating condition
>>>>>>> at the merge site. Then you can use a custom GraphStage to implement
>>>>>>> the
>>>>>>> merge and have the right termination logic:
>>>>>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M2/scala/stream-customize.html#custom-processing-with-graphstage
>>>>>>>
>>>>>>> I don't yet see how to provide such a built-in stage because the
>>>>>>> termination condition can be very different in different use-cases.
>>>>>>> Will
>>>>>>> think about it though, but it would be nice if you could contribute a
>>>>>>> GraphStage that at least solved your problems.
>>>>>>>
>>>>>>> -Endre
>>>>>>>
>>>>>>> On Tue, Dec 1, 2015 at 2:02 AM, David Knapp <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'm actually in pretty much the exact same position you're in,
>>>>>>>> except instead of crawling for new URLs, I'm filtering back unfinished
>>>>>>>> responses into my stream so I can wait until they're done. This is my
>>>>>>>> graph
>>>>>>>>
>>>>>>>>
>>>>>>>> source ~> sync ~> merge.preferred
>>>>>>>> merge ~> broadcast
>>>>>>>> broadcast.out(0
>>>>>>>> ) ~> finishedFilter
>>>>>>>> broadcast.out(1
>>>>>>>> ) ~> unfinishedFilter
>>>>>>>> merge <~ syncStatusFlow <~ Limiter.limit[Long](Client.
>>>>>>>> limiter, 10 seconds) <~ Flow[SyncStatus].map(_.account) <~
>>>>>>>> unfinishedFilter
>>>>>>>>
>>>>>>>>
>>>>>>>> (finishedFilter.outlet)
>>>>>>>>
>>>>>>>> On Monday, May 18, 2015 at 3:29:06 AM UTC-7, Jan Liße wrote:
>>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> i'm currently building a scraper system on top of Akka Streams. I
>>>>>>>>> have written a Flow that is able to follow paginated sites and scrape
>>>>>>>>> them
>>>>>>>>> in a loop.
>>>>>>>>> For this i use a feedback merge.
>>>>>>>>>
>>>>>>>>> My code: <https://gist.github.com/janlisse/f2672bf8bbee009ef009>
>>>>>>>>>
>>>>>>>>> <script src="
>>>>>>>>> https://gist.github.com/janlisse/f2672bf8bbee009ef009.js
>>>>>>>>> "></script>
>>>>>>>>>
>>>>>>>>> scrapePaginated takes a function that decides if there are further
>>>>>>>>> pages to scrape. If there are, it returns as part of the response
>>>>>>>>> tuple a
>>>>>>>>> Some() with the next url.
>>>>>>>>> And of course a None for the last page.
>>>>>>>>> The iteration and the feedback loop works and all pages are
>>>>>>>>> scraped properly. But even when all URL's are processed the stream
>>>>>>>>> never
>>>>>>>>> completes. OnComplete never gets invoked.
>>>>>>>>> Is this an expected behaviour? Or is there an error in my
>>>>>>>>> scrapePaginated method? I read the doc's chapter on graph deadlocks
>>>>>>>>> and
>>>>>>>>> liveness issues and finally added a buffer step with
>>>>>>>>> OverflowStrategy.Fail
>>>>>>>>> to the feedback loop but to no avail.
>>>>>>>>> If it helps to clarify the problem i can provide a simple Spec
>>>>>>>>> that reproduces the issue.
>>>>>>>>>
>>>>>>>>> Thanks in advance for any help!
>>>>>>>>>
>>>>>>>>> Jan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>> >>>>>>>>>> Check the FAQ:
>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>> >>>>>>>>>> Search the archives:
>>>>>>>> https://groups.google.com/group/akka-user
>>>>>>>> ---
>>>>>>>> You received this message because you are subscribed to the Google
>>>>>>>> Groups "Akka User List" group.
>>>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>>> send an email to [email protected].
>>>>>>>> To post to this group, send email to [email protected].
>>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>> >>>>>>>>>> Check the FAQ:
>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>> >>>>>>>>>> Search the archives:
>>>>>> https://groups.google.com/group/akka-user
>>>>>> ---
>>>>>> You received this message because you are subscribed to the Google
>>>>>> Groups "Akka User List" group.
>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>> send an email to [email protected].
>>>>>> To post to this group, send email to [email protected].
>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>
>>>>>
>>>>>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.