Hi,

In Akka strem documentation 
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC2/scala/stream-graphs.html
 
section "Graph cycles, liveness and deadlocks" gives an example about how 
to handle cycles with merge prefered. In my code i wrote fleximerge


class Multiplexer3[I1, I2, I3, O](logger:Logger,name: String, f: :+:[I1, 
:+:[I2, :+:[I3, CNil]]] => Option[O])
>   extends FlexiMerge[O, FanInShape3[I1, I2, I3, O]](new FanInShape3[I1, I2, 
> I3, O](name), OperationAttributes.name(name)) {
>
>   import FlexiMerge._
>
>   override def createMergeLogic(p: FanInShape3[I1, I2, I3, O]): MergeLogic[O] 
> = new MergeLogic[O] {
>     override def initialState: State[_] = 
> State(ReadPreferred(p.in2,List(p.in0,p.in1))) { (ctx, port, element) =>
>       logger.debug(s"got message from port $port and element $element")
>       port match {
>         case p.in0 =>
>           logger.debug(s"selected port one")
>           f(Coproduct[:+:[I1, :+:[I2, :+:[I3, 
> CNil]]]](element.asInstanceOf[I1])).foreach(ctx.emit)
>         case p.in1 =>
>           logger.debug(s"selected port two")
>           f(Coproduct[:+:[I1, :+:[I2, :+:[I3, 
> CNil]]]](element.asInstanceOf[I2])).foreach(ctx.emit)
>         case p.in2 =>
>           logger.debug(s"selected port three")
>           f(Coproduct[:+:[I1, :+:[I2, :+:[I3, 
> CNil]]]](element.asInstanceOf[I3])).foreach(ctx.emit)
>       }
>       SameState
>     }
>
>     override def eagerClose: CompletionHandling = eagerClose
>   }
> }
>
>
It takes input from three port and applying a function over given element. 
If function returns other than None  it emits that returned element. In my 
graph i made a cycle  to represent acknowledgement messages and it feed 
into "p.in2" in above graph. But as i understand and what i saw from my 
tests that if other source components which i connect to p.in0 and p.in1 
are fast enough, they are filling buffer of that component and read 
preferred does not solve that issue. As i understand read preferred is 
working on the buffer which means if buffer is already filled with not 
preferred messages then there is nothing to do other than handling my ack 
messages in other other stage. I did not do it in this way because there is 
some state about messages i sent so handling them in one stage allow me to 
the that stage changes without locking or synchronization. Can you explain 
how read preferred is working and i can take some suggestions to solve this 
issue :) 



-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to