>From API docs: * Read condition for the [[MergeLogic#State]] that will be
* fulfilled when there are elements for any of the given upstream * inputs, however it differs from [[ReadAny]] in the case that both * the `preferred` and at least one other `secondary` input have demand, * the `preferred` input will always be consumed first. /Patrik On Wed, May 13, 2015 at 9:36 AM, anil chalil <[email protected]> wrote: > Hi, > > In Akka strem documentation > http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC2/scala/stream-graphs.html > section "Graph cycles, liveness and deadlocks" gives an example about how > to handle cycles with merge prefered. In my code i wrote fleximerge > > > class Multiplexer3[I1, I2, I3, O](logger:Logger,name: String, f: :+:[I1, > :+:[I2, :+:[I3, CNil]]] => Option[O]) >> extends FlexiMerge[O, FanInShape3[I1, I2, I3, O]](new FanInShape3[I1, I2, >> I3, O](name), OperationAttributes.name(name)) { >> >> import FlexiMerge._ >> >> override def createMergeLogic(p: FanInShape3[I1, I2, I3, O]): >> MergeLogic[O] = new MergeLogic[O] { >> override def initialState: State[_] = >> State(ReadPreferred(p.in2,List(p.in0,p.in1))) { (ctx, port, element) => >> logger.debug(s"got message from port $port and element $element") >> port match { >> case p.in0 => >> logger.debug(s"selected port one") >> f(Coproduct[:+:[I1, :+:[I2, :+:[I3, >> CNil]]]](element.asInstanceOf[I1])).foreach(ctx.emit) >> case p.in1 => >> logger.debug(s"selected port two") >> f(Coproduct[:+:[I1, :+:[I2, :+:[I3, >> CNil]]]](element.asInstanceOf[I2])).foreach(ctx.emit) >> case p.in2 => >> logger.debug(s"selected port three") >> f(Coproduct[:+:[I1, :+:[I2, :+:[I3, >> CNil]]]](element.asInstanceOf[I3])).foreach(ctx.emit) >> } >> SameState >> } >> >> override def eagerClose: CompletionHandling = eagerClose >> } >> } >> >> > It takes input from three port and applying a function over given element. > If function returns other than None it emits that returned element. In my > graph i made a cycle to represent acknowledgement messages and it feed > into "p.in2" in above graph. But as i understand and what i saw from my > tests that if other source components which i connect to p.in0 and p.in1 > are fast enough, they are filling buffer of that component and read > preferred does not solve that issue. As i understand read preferred is > working on the buffer which means if buffer is already filled with not > preferred messages then there is nothing to do other than handling my ack > messages in other other stage. I did not do it in this way because there is > some state about messages i sent so handling them in one stage allow me to > the that stage changes without locking or synchronization. Can you explain > how read preferred is working and i can take some suggestions to solve this > issue :) > > > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Patrik Nordwall Typesafe <http://typesafe.com/> - Reactive apps on the JVM Twitter: @patriknw -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
