I had the same type of use case recently and I think there might be some other potential strategies for completing feedback loops.
Here are some rough pseudocode sketches: *CASE # 1*: An Option[T] going back into the loop: - takeWhile <https://doc.akka.io/docs/akka/2.5.7/stream/stages-overview.html?language=scala#takewhile> Option[T].nonEmpty - mapConcat <https://doc.akka.io/docs/akka/2.5.7/stream/stages-overview.html?language=scala#mapconcat> to transform Option[T] into T because the merge takes T, not Option[T] val takeWhile = Flow[Option[T]].takeWhile(_.nonEmpty) val mapConcat = Flow[Option[T]].mapConcat(_.toList) broadcast ~> sink source ~> merge ~> fetchUrl ~> broadcast ~> extractNextUrlOpt merge <~ mapConcat <~ takeWhile <~ extractNextUrlOpt *CASE # 2*: A varying number of T's going back into the loop (instead of an Option[T] we are dealing with a List[T]): - a statefulMapConcat <https://doc.akka.io/docs/akka/2.5.7/stream/stages-overview.html?language=scala#statefulmapconcat> from List[T] to a tuple (Long, List[T]) where `Long` is a stateful counter tracking the number of outstanding tasks - takeWhile <https://doc.akka.io/docs/akka/2.5.7/stream/stages-overview.html?language=scala#takewhile> counter != 0 - mapConcat <https://doc.akka.io/docs/akka/2.5.7/stream/stages-overview.html?language=scala#mapconcat> to turn List[T] to individual Ts, because the merge takes T, not List[T] val counter = Flow[List[T]].statefulMapConcat { () => var counter = sourceSize // Initialize the stateful counter to the # of tasks we start out with in our Source newTasks => { counter -= 1 // Each time we get a list of new tasks, we know that exactly one old task is done counter += newTasks.size // And exactly the # of tasks in the list has been added (counter, newTasks) } } val takeWhile = Flow[(Long, List[T])].takeWhile { case (counter, _) => counter != 0 } val mapConcat = Flow[(Long, List[T])].mapConcat { case (_, newTasks) => newTasks } broadcast ~> sink source ~> merge ~> fetchUrl ~> broadcast ~> extractNextUrlList merge <~ mapConcat <~ takeWhile <~ counter <~ extractNextUrlList (In practice you might want something other than a List for better time complexity of the `size` method, etc.; I'm using a List here just to keep things simple). *CASE # 3*: Sometimes a feedback loop arises because a node in the graph holds state that needs to be able to handle messages from multiple other places in the graph, which causes a feedback loop. In this scenario, a feedback loop can sometimes be "straightened out" by holding the state in a separate actor and by using the mapAsync flow + ask pattern <https://doc.akka.io/docs/akka/2.5.7/stream/stream-integrations.html?language=scala#mapasync-ask> from multiple places in a "straight line" graph that no longer contains a feedback loop. -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
