That's my question. I'd buffer Bs until Bs are demanded but doc says that FlexiRoute should not have mutable state and it can lead to uncontrollable buffer growth. Am I doing it all wrong and need something else instead of FlexiRoute?
On Thursday, April 9, 2015 at 8:42:14 PM UTC+3, √ wrote: > > What will you do if you get B's but only demand for As? > > -- > Cheers, > √ > On 9 Apr 2015 19:10, "Andrey Kuznetsov" <[email protected] <javascript:>> > wrote: > >> If I change State condition to DemandFromAll, stage will wait for all >> outlets to start demanding but I need elements to be emitted as soon as >> possible. >> >> On Thursday, April 9, 2015 at 7:58:45 PM UTC+3, Andrey Kuznetsov wrote: >>> >>> I need to create a stage with one inlet accepting elements of type >>> Message and three outlets emitting elements of types A, B, C, all of which >>> are Message subtypes. >>> Depending on which element came (A, B or C) it should be emitted on one >>> of three outlets. >>> I am wondering how is it possible to achieve? If I create a State with >>> DemandFromAny(all three outlets), I have no guarantee that element passed >>> in State's onInput is an instance of type that demanding outlets should >>> emit. >>> Here is FlexiRoute example (I simplified it to Message, A, B and C types >>> to make in my app-context independent). It works only when it is so lucky >>> that element matches type of the demanding outlet. >>> >>> sealed trait Message >>> class A extends Message >>> class B extends Message >>> class C extends Message >>> >>> class MessageDiscriminatorShape(_init: Init[Message] = >>> Name[Message]("MessageDiscriminator")) >>> extends FanOutShape[Message](_init) { >>> val outA = newOutlet[A]("outA") >>> val outB = newOutlet[B]("outB") >>> val outC = newOutlet[C]("outC") >>> >>> protected override def construct(i: Init[Message]) = new >>> MessageDiscriminatorShape(i) >>> } >>> >>> class MessageDiscriminator >>> extends FlexiRoute[Message, MessageDiscriminatorShape]( >>> new MessageDiscriminatorShape, >>> OperationAttributes.name("MessageDiscriminator")) { >>> import FlexiRoute._ >>> >>> override def createRouteLogic(p: PortT) = new RouteLogic[Message] { >>> override def initialState = State[Any](DemandFromAny(p.outlets)) { >>> (ctx, _, element) => >>> element match { >>> case e: A => >>> ctx.emit(p.outA)(e) >>> case e: B => >>> ctx.emit(p.outB)(e) >>> case e: C => >>> ctx.emit(p.outC)(e) >>> } >>> >>> SameState >>> } >>> >>> override def initialCompletionHandling = eagerClose >>> } >>> } >>> >>> >>> >>> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected] <javascript:>. >> To post to this group, send email to [email protected] >> <javascript:>. >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
