There seems to be no way to express this (simple) shape using a
`FlexiRoute`. A naive version:
class SplitShape[A](init: FanOutShape.Init[A] =
FanOutShape.Name[A]("Split"))
extends FanOutShape[A](init) {
val whenTrue = newOutlet[A]("whenTrue")
val whenFalse = newOutlet[A]("whenFalse")
protected def construct(init: FanOutShape.Init[A]) = new
SplitShape(init)
}
class Split[A](predicate: A => Boolean)
extends FlexiRoute[A, SplitShape[A]](new SplitShape[A],
OperationAttributes.name("PartialOutput")) {
import FlexiRoute._
def createRouteLogic(p: PortT) = new RouteLogic[A] {
def initialState = State(DemandFromAny(p)) { (ctx, port, element) =>
import p._
val result = predicate(element)
(result, port) match {
case (true, `whenTrue`) => ctx.emit(whenTrue)(element)
case (false, `whenFalse`) => ctx.emit(whenFalse)(element)
case (true, `whenFalse`) =>
println("lost element " + element)
case (false, `whenTrue`) =>
println("lost element " + element)
case _ => sys error "Unknown port"
}
SameState
}
}
}
It seems (and I might be mistaken) that the demand and availability of
elements are tied together. If I would replace the 'lost element' cases
with placing the elements in a buffer, they would only grow as the
'onInput' method is only called when input is available, not when there is
demand, regardless of input.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.