I had the same type of use case recently and I think there might be some 
other potential strategies for completing feedback loops.

Here are some rough pseudocode sketches:

*CASE # 1*: An Option[T] going back into the loop:
   - takeWhile 
<https://doc.akka.io/docs/akka/2.5.7/stream/stages-overview.html?language=scala#takewhile>
 
Option[T].nonEmpty
   - mapConcat 
<https://doc.akka.io/docs/akka/2.5.7/stream/stages-overview.html?language=scala#mapconcat>
 
to transform Option[T] into T because the merge takes T, not Option[T]

val takeWhile = Flow[Option[T]].takeWhile(_.nonEmpty)
val mapConcat = Flow[Option[T]].mapConcat(_.toList)

                                broadcast ~> sink
source ~> merge ~> fetchUrl ~>  broadcast ~> extractNextUrlOpt
          merge <~ mapConcat <~ takeWhile <~ extractNextUrlOpt


*CASE # 2*: A varying number of T's going back into the loop (instead of an 
Option[T] we are dealing with a List[T]):
   - a statefulMapConcat 
<https://doc.akka.io/docs/akka/2.5.7/stream/stages-overview.html?language=scala#statefulmapconcat>
 
from List[T] to a tuple (Long, List[T]) where `Long` is a stateful counter 
tracking the number of outstanding tasks
   - takeWhile 
<https://doc.akka.io/docs/akka/2.5.7/stream/stages-overview.html?language=scala#takewhile>
 
counter != 0
   - mapConcat 
<https://doc.akka.io/docs/akka/2.5.7/stream/stages-overview.html?language=scala#mapconcat>
 
to turn List[T] to individual Ts, because the merge takes T, not List[T]

val counter = Flow[List[T]].statefulMapConcat { () =>
  var counter = sourceSize // Initialize the stateful counter to the # of 
tasks we start out with in our Source
  newTasks => {
    counter -= 1 // Each time we get a list of new tasks, we know that 
exactly one old task is done
    counter += newTasks.size // And exactly the # of tasks in the list has 
been added
    (counter, newTasks)
  }
}
val takeWhile = Flow[(Long, List[T])].takeWhile { case (counter, _) => 
counter != 0 }
val mapConcat = Flow[(Long, List[T])].mapConcat { case (_, newTasks) => 
newTasks }

                                        broadcast   ~>  sink
source ~> merge   ~>   fetchUrl   ~>    broadcast   ~>  extractNextUrlList
          merge <~ mapConcat <~ takeWhile <~ counter <~ extractNextUrlList

(In practice you might want something other than a List for better time 
complexity of the `size` method, etc.; I'm using a List here just to keep 
things simple).


*CASE # 3*: Sometimes a feedback loop arises because a node in the graph 
holds state that needs to be able to handle messages from multiple other 
places in the graph, which causes a feedback loop. In this scenario, a 
feedback loop can sometimes be "straightened out" by holding the state in a 
separate actor and by using the mapAsync flow + ask pattern 
<https://doc.akka.io/docs/akka/2.5.7/stream/stream-integrations.html?language=scala#mapasync-ask>
 
from multiple places in a "straight line" graph that no longer contains a 
feedback loop.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to