Hi,
sometimes I need to "ignore" the output of a flow and use it's input
instead down the stream... There are 2 possible ways to do that I'm aware
of :
def bypass[I,O](flow: Flow[I,O]): Flow[I,I,Any] = Flow() { implicit b =>
import FlowGraph.Implicits._
val broadcast = b.add(Broadcast[E](2))
broadcast.out(0) ~> flow ~> Sink.ignore
(broadcast.in, broadcast.out(1))
}
This one has a disadvantage that possible errors/exceptions are not
propagated and swallowed by Sink.ignore. Second way is zipping :
def bypass(flow: Flow[I,O]): Flow[I,I,Any] = Flow() { implicit b =>
import FlowGraph.Implicits._
val bcast = b.add(Broadcast[I](2))
val zip = b.add(Zip[I, O]())
bcast.out(0) ~> zip.in0
bcast.out(1) ~> flow ~> zip.in1
(bcast.in, zip.out.map(_._1).outlet)
}
But it has a disadvantage that the flow being bypassed cannot mess with the
stream (filter, mapConcat, etc.) otherwise it gets corrupted and it will
cause unpredictable behavior of the stream.
Does anybody know a better way of doing that ?
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.