Hi,
In Akka strem documentation
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC2/scala/stream-graphs.html
section "Graph cycles, liveness and deadlocks" gives an example about how
to handle cycles with merge prefered. In my code i wrote fleximerge
class Multiplexer3[I1, I2, I3, O](logger:Logger,name: String, f: :+:[I1,
:+:[I2, :+:[I3, CNil]]] => Option[O])
> extends FlexiMerge[O, FanInShape3[I1, I2, I3, O]](new FanInShape3[I1, I2,
> I3, O](name), OperationAttributes.name(name)) {
>
> import FlexiMerge._
>
> override def createMergeLogic(p: FanInShape3[I1, I2, I3, O]): MergeLogic[O]
> = new MergeLogic[O] {
> override def initialState: State[_] =
> State(ReadPreferred(p.in2,List(p.in0,p.in1))) { (ctx, port, element) =>
> logger.debug(s"got message from port $port and element $element")
> port match {
> case p.in0 =>
> logger.debug(s"selected port one")
> f(Coproduct[:+:[I1, :+:[I2, :+:[I3,
> CNil]]]](element.asInstanceOf[I1])).foreach(ctx.emit)
> case p.in1 =>
> logger.debug(s"selected port two")
> f(Coproduct[:+:[I1, :+:[I2, :+:[I3,
> CNil]]]](element.asInstanceOf[I2])).foreach(ctx.emit)
> case p.in2 =>
> logger.debug(s"selected port three")
> f(Coproduct[:+:[I1, :+:[I2, :+:[I3,
> CNil]]]](element.asInstanceOf[I3])).foreach(ctx.emit)
> }
> SameState
> }
>
> override def eagerClose: CompletionHandling = eagerClose
> }
> }
>
>
It takes input from three port and applying a function over given element.
If function returns other than None it emits that returned element. In my
graph i made a cycle to represent acknowledgement messages and it feed
into "p.in2" in above graph. But as i understand and what i saw from my
tests that if other source components which i connect to p.in0 and p.in1
are fast enough, they are filling buffer of that component and read
preferred does not solve that issue. As i understand read preferred is
working on the buffer which means if buffer is already filled with not
preferred messages then there is nothing to do other than handling my ack
messages in other other stage. I did not do it in this way because there is
some state about messages i sent so handling them in one stage allow me to
the that stage changes without locking or synchronization. Can you explain
how read preferred is working and i can take some suggestions to solve this
issue :)
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.