Hi Oliver,
Have you read the section of our docs about cyclic graphs and deadlocks?
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-graphs.html#Graph_cycles__liveness_and_deadlocks

Because akka streams (all reactive streams implementations) are purely
"demand driven", you have created a cycle which can not "start",
because in order to get moving you need a result from the Sink - however
the Sink will never get the first element because the Source is waiting for
the Sink to emit something (which is waiting for the Source to emit
something, which is waiting... – you see where this is headed I hope).

When working with cycles such you'll need to introduce some element that
gets the cycle running, like an initial element or a processing stage like
expand which can help break this deadlock.

On Thu, Apr 16, 2015 at 3:37 PM, Oliver Winks <
oliverwinks.develo...@gmail.com> wrote:

> Hi,
>
> I would like to create a simple flow that consists of a Source
> (ActorPublisher), several Flows and a Sink (ActorSubscriber) like this:
>
> ActorPublisher ~> Flow1 ~> Flow2 ~> .... ~> FlowN ~> ActorSubscriber
>
> However, I would like to connect the Sink (ActorSubscriber) and Source
> (ActorPublisher) so that the result of the Sink is passed back to the
> Source and passed through the flow again. Basically I want a recursive or
> cyclical FlowGraph.
>
> I've tried to do this by sending a message from the Sink to the Source
> which contains the result from the Sink, this causes the Flow to hang, it
> looks like deadlock or something. I've also tried to request the previous
> result from the Sink when a Request is acted upon by the Source: Pseudo
> code below e.g.
>
> // Source receive block
> def receive: Receive = {
>   case Request(_) =>
>     implicit val timeout = Timeout(5 seconds)
>     val future = sink ? "GetPrevResult"
>
>     val prevRes = Await.result(future, timeout.duration)
>
>     onNext(prevRes)
> }
>
>
> // Sink receive blocks
> def receive: Receive = {
>   case OnNext(val) =>
>     val res = //... do stuff
>     context.become(receiveWithPrev(res))
> }
>
> def receiveWithPrev(prev): Receive = {
>   case OnNext(val) =>
>     val res = //... do stuff
>     context.become(receiveWithPrev(res))
>
>   case "GetPrevResult" =>
>     prev
> }
>
>
> Neither of these approaches work. The code above times out waiting for the
> previous result. Is there a standard mechanism in Akka Streams for doing
> this?
>
> Cheers,
>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
Konrad 'ktoso' Malawski
Akka <http://akka.io/> @ Typesafe <http://typesafe.com/>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to