Hello,

Here is the implementation of a custom feedback stage. It's composed of 
three port : 
- out [T]: The output of the merge operation made by this stage
- in [T] : The main input port (outside the feedback loop)
- feedback [Option[T]] : the feedback loop input port

Keep in mind that it can only works if the number of element flowing out is 
exactly the number flowing in back in the feedback port. The stage will be 
completed once the main input port (in) is completed and that there are no 
outstanding request flowing in the feedback loop (the last element in this 
loop should be a None to complete the stage).

  class FeedbackShape[T](val _init: FanInShape.Init[T]) extends 
FanInShape[T](_init) {
    def this() = this(FanInShape.Name[T]("FeedbackShape"))

    override protected def construct(init: Init[T]): FanInShape[T] = new 
FeedbackShape(init)
    override def deepCopy(): FeedbackShape[T] = 
super.deepCopy().asInstanceOf[FeedbackShape[T]]


    val in = newInlet[T]("in")
    val feedback = newInlet[Option[T]]("feedback")
  }

  class Feedback[T] extends GraphStage[FeedbackShape[T]] {
    // Define the shape of this stage, which is SourceShape with the port 
we defined above
    override val shape: FeedbackShape[T] = new FeedbackShape


    def in: Inlet[T] = shape.in
    def out: Outlet[T] = shape.out
    def feedback: Inlet[Option[T]] = shape.feedback


    // This is where the actual (possibly stateful) logic will live
    override def createLogic(inheritedAttributes: Attributes): 
GraphStageLogic = new GraphStageLogic(shape) {
      var flowingCount = 0

      def checkComplete(): Unit = {
        // Complete this stage if there are no data in the feedback loop
        if(flowingCount == 0 && isClosed(in)) {
          completeStage()
        }
      }

      def pullSecondary(): Unit = tryPull(in)

      /**
       * Output port handler
       */
      setHandler(out, new OutHandler {
        private var first = true
        override def onPull(): Unit = {
          if (first) {
            first = false
            tryPull(feedback)
            tryPull(in)
          }
        }
      })

      var feedbackEmitting = 0

      /**
       * Prefered input port handler
       */
      setHandler(feedback, new InHandler {
        override def onUpstreamFinish(): Unit = checkComplete()
        override def onPush(): Unit = {
          /* Received back the element in the feedback loop */
          flowingCount -= 1
          emitFeedback()
        }

        def emitFeedback(): Unit = {
          feedbackEmitting += 1

          grab(feedback) match {
            case Some(e) => {
              flowingCount += 1
              emit(out, e, emitted)
            }
            case None => checkComplete()
          }
          tryPull(feedback)
        }

        val emitted = () ⇒ {
          feedbackEmitting -= 1
          if (isAvailable(feedback)) emitFeedback()
          else if (feedbackEmitting == 0) emitSecondary()
        }

        def emitSecondary(): Unit = {
          if (isAvailable(in)) {
            flowingCount += 1
            emit(out, grab(in), pullSecondary)
          }
        }

      })

      /**
       * Secondary input port handler
       */
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          if (feedbackEmitting == 0) {
            flowingCount += 1
            emit(out, grab(in), pullSecondary)
          }
        }
        override def onUpstreamFinish(): Unit = checkComplete()
      })


    }
  }

It can be used in a graph of this shape : 
source ~> feedback.in
          feedback.out                ~>          broadcast ~> output/sink
          feedback.feedback   <~ loopOperation <~ broadcast

where loopOperation must check if it needs to send back the data to the 
loop by sending either
- Some(t) : if the loop shall continue
- None : if the loop must end for the given input

The feedback stage will be responsible of unboxing the Option[T] sent to 
it's feedback port and will send it as a T to the out port.

I hope this will help you.

Best Regards,
JP.


On Thursday, December 3, 2015 at 12:10:27 PM UTC+1, Jean-Pierre Thomasset 
wrote:
>
> For my problem, I finally found a solution but that may not work in all 
> cases. The solution is possible because I know that an object flowing in 
> the feedback loop cannot be multiplied (in my case there could be at max 
> one 'next url' in the page currently fetched. So changing 
> the extractNextUrl to output Option[T] instead of T, I can track at the 
> merge site if there is another page to crawl, thus deciding if I need to 
> finish this stage.
>
>                                          broadcast ~> sink
> source ~> merge           ~> fetchUrl ~> broadcast ~> extractNextUrl
>           merge.preferred             <~              extractNextUrl
>
> Here is what I want to do : 
> - The extractNextUrl will send Some(t) when a next url is found, and None 
> when there is none.
> - The 'merge' stage will count outstanding data in the feedback loop. i.e, 
> the difference between the output count and the merge.preferred input count 
> (including None). 
> - If the first input port of the merge operation is closed and upon 
> reception of the last None from extractNextUrl and if there are no 
> outstanding request then finish this stage.
>
> I'll try to implement this solution in a custom merge stage this weekend 
> and I'll let you know if it works.
>
> Regards,
> Jean-Pierre.
>
>
> On Wednesday, December 2, 2015 at 8:58:38 PM UTC+1, David Knapp wrote:
>>
>> I agree with your proposed solution. Maybe a Source that emits a "this is 
>> the last element" element, but something that the stage can implicitly 
>> handle or something. 
>>
>> On Tuesday, December 1, 2015 at 5:08:29 AM UTC-7, Jean-Pierre Thomasset 
>> wrote:
>>>
>>> Thank you for the message,
>>>
>>> I thought of that but as the stream approach worked really well when the 
>>> flow is uninterrupted, I tried to find a solution to the completion 
>>> problem. I think there are multiple case of feedback loop completion 
>>> deadlock and I was hoping there was a nice solution.
>>>
>>> One solution would be to have some kind of out of bound message passing 
>>> through the graph or some kind of tryClose/tryFinish operation that would 
>>> fail on the first stage still processing data. That would allow to send the 
>>> operation or message in the feedback loop and wait until it reaches back 
>>> the merge operation. This is something I tried to implement using an 
>>> Either[T, PoisonPill] flowing in the graph but it requires each stage to 
>>> handle this type which can be a pain.
>>>
>>> Regards,
>>> Jean-Pierre.
>>>
>>> On Tuesday, December 1, 2015 at 12:23:53 PM UTC+1, drewhk wrote:
>>>>
>>>> Hi,
>>>>
>>>> In this case I don't see the reason of modeling the loop as a stream. 
>>>> By making it a stream you make all this state dispersed among concurrent 
>>>> entities, but in reality this is all state that needs to be at one place. 
>>>> If this is the case then this is clearly not a good fit for streams. You 
>>>> can make this complex processing part of a custom stage, or you can create 
>>>> an actor for this and tie to your other streaming parts. 
>>>>
>>>> -Endre
>>>>
>>>> On Tue, Dec 1, 2015 at 12:14 PM, Jean-Pierre Thomasset <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi Endre,
>>>>>
>>>>> Thank you for your feedback. The problem is the termination condition 
>>>>> is dependent on the feedback loop being dried-out. Basically, if this 
>>>>> stage 
>>>>> had access to all stages between it's output port and the feedback input 
>>>>> port, I could check that
>>>>>
>>>>>    - The 'external' input port of the merge operation is CLOSED_EMPTY
>>>>>    - The 'feedback' input port of the merge operation is PULLED
>>>>>    - The output port of the merge operation is PULLED
>>>>>    - All input & output ports between the output port and the 
>>>>>    'feedback' input port of the merge operation are PULLED
>>>>>
>>>>> However there are at least two issues with this process : 
>>>>>
>>>>>    - The merge stage is not aware of the state of all the stages 
>>>>>    ports (at least, I did not found how ;)) and even if it had access, it 
>>>>>    would be risky to access the state of another stage running in another 
>>>>>    actor.
>>>>>    - A stage pulling from its input port before pushing its current 
>>>>>    payload (like a buffer or an asynchronous stage able to perform 
>>>>> concurrent 
>>>>>    operation) would defeat the detection.
>>>>>
>>>>> If we don't care of the second issue, a solution would be to have a 
>>>>> probe at each edge of the graph, maybe using a custom connector instead 
>>>>> of 
>>>>> '~>'.
>>>>>
>>>>> Jean-Pierre.
>>>>> On Tuesday, December 1, 2015 at 9:29:01 AM UTC+1, drewhk wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> In both cases, you have to know what is your terminating condition at 
>>>>>> the merge site. Then you can use a custom GraphStage to implement the 
>>>>>> merge 
>>>>>> and have the right termination logic: 
>>>>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M2/scala/stream-customize.html#custom-processing-with-graphstage
>>>>>>
>>>>>> I don't yet see how to provide such a built-in stage because the 
>>>>>> termination condition can be very different in different use-cases. Will 
>>>>>> think about it though, but it would be nice if you could contribute a 
>>>>>> GraphStage that at least solved your problems.
>>>>>>
>>>>>> -Endre
>>>>>>
>>>>>> On Tue, Dec 1, 2015 at 2:02 AM, David Knapp <[email protected]> 
>>>>>> wrote:
>>>>>>
>>>>>>> I'm actually in pretty much the exact same position you're in, 
>>>>>>> except instead of crawling for new URLs, I'm filtering back unfinished 
>>>>>>> responses into my stream so I can wait until they're done. This is my 
>>>>>>> graph
>>>>>>>
>>>>>>>
>>>>>>> source ~> sync ~> merge.preferred
>>>>>>>                                            merge ~> broadcast
>>>>>>>                                                     broadcast.out(0) 
>>>>>>> ~> finishedFilter
>>>>>>>                                                     broadcast.out(1) 
>>>>>>> ~> unfinishedFilter
>>>>>>>           merge <~ syncStatusFlow <~ Limiter.limit[Long](Client.
>>>>>>> limiter, 10 seconds) <~ Flow[SyncStatus].map(_.account) <~ 
>>>>>>> unfinishedFilter
>>>>>>>
>>>>>>>
>>>>>>> (finishedFilter.outlet)
>>>>>>>
>>>>>>> On Monday, May 18, 2015 at 3:29:06 AM UTC-7, Jan Liße wrote:
>>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> i'm currently building a scraper system on top of Akka Streams. I 
>>>>>>>> have written a Flow that is able to follow paginated sites and scrape 
>>>>>>>> them 
>>>>>>>> in a loop.
>>>>>>>> For this i use a feedback merge. 
>>>>>>>>
>>>>>>>> My code: <https://gist.github.com/janlisse/f2672bf8bbee009ef009>
>>>>>>>>
>>>>>>>> <script src="
>>>>>>>> https://gist.github.com/janlisse/f2672bf8bbee009ef009.js";></script>
>>>>>>>>
>>>>>>>> scrapePaginated takes a function that decides if there are further 
>>>>>>>> pages to scrape. If there are, it returns as part of the response 
>>>>>>>> tuple a 
>>>>>>>> Some() with the next url.
>>>>>>>> And of course a None for the last page.
>>>>>>>> The iteration and the feedback loop works and all pages are scraped 
>>>>>>>> properly. But even when all URL's are processed the stream never 
>>>>>>>> completes. 
>>>>>>>> OnComplete never gets invoked.
>>>>>>>> Is this an expected behaviour? Or is there an error in my 
>>>>>>>> scrapePaginated method? I read the doc's chapter on graph deadlocks 
>>>>>>>> and 
>>>>>>>> liveness issues and finally added a buffer step with 
>>>>>>>> OverflowStrategy.Fail 
>>>>>>>> to the feedback loop but to no avail.
>>>>>>>> If it helps to clarify the problem i can provide a simple Spec that 
>>>>>>>> reproduces the issue.
>>>>>>>>
>>>>>>>> Thanks in advance for any help!
>>>>>>>>
>>>>>>>> Jan  
>>>>>>>>
>>>>>>>>
>>>>>>>> -- 
>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>> >>>>>>>>>> Check the FAQ: 
>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>> >>>>>>>>>> Search the archives: 
>>>>>>> https://groups.google.com/group/akka-user
>>>>>>> --- 
>>>>>>> You received this message because you are subscribed to the Google 
>>>>>>> Groups "Akka User List" group.
>>>>>>> To unsubscribe from this group and stop receiving emails from it, 
>>>>>>> send an email to [email protected].
>>>>>>> To post to this group, send email to [email protected].
>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>
>>>>>>
>>>>>> -- 
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: 
>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>> >>>>>>>>>> Search the archives: 
>>>>> https://groups.google.com/group/akka-user
>>>>> --- 
>>>>> You received this message because you are subscribed to the Google 
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>>> an email to [email protected].
>>>>> To post to this group, send email to [email protected].
>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to