This uses our disjunction wrapper type, Or, which is just a wrapper around
scalaz's disjunction
package object disjunction extends ToEitherOps {
type Or[L, R] = \/[L, R]
type Bad[L] = -\/[L]
type Good[R] = \/-[R]
object Bad {
def apply[L](left: L) = -\/(left)
def unapply[L, R](or: L Or R): Option[L] = or.swap.toOption
}
object Good {
def apply[R](right: R) = \/-(right)
def unapply[L, R](or: L Or R): Option[R] = or.toOption
}
}
This is a custom shape (using Akka Streams 1.0) that consumes a stream of
elements of type A Or B and produces a stream of elements of type A and a
stream of elements of type B
class OrRouterShape[A, B](_init: Init[A Or B] = Name[A Or B]("OrRouterShape"))
extends FanOutShape[A Or B](_init) {
val bad = newOutlet[A]("bad")
val good = newOutlet[B]("good")
protected override def construct(i: Init[A Or B]) = new OrRouterShape(i)
}
class OrRouter[A, B] extends FlexiRoute[A Or B, OrRouterShape[A, B]](
new OrRouterShape, Attributes.name("OrRouter")
) {
import FlexiRoute._
override def createRouteLogic(p: PortT) = new RouteLogic[A Or B] {
override def initialState = State[Any](DemandFromAll(p.outlets)) {
(ctx, _, element) =>
element match {
case Bad(left) =>
ctx.emit(p.bad)(left)
case Good(right) =>
ctx.emit(p.good)(right)
}
SameState
}
override def initialCompletionHandling = eagerClose
}
}
I can post a usage example later today, but that requires some sanitizing
to strip out the $MY_EMPLOYER specific stuff.
On Wednesday, November 11, 2015 at 2:18:05 PM UTC-8, Long Cao wrote:
>
> I'm actually coming right up on this very problem at work myself and I
> would love to see an example of what you're talking about. (My code follows
> something like Flow[Request, Try[Result], Unit] but I can transcribe
> whatever you present).
>
> On Wednesday, November 11, 2015 at 3:08:02 PM UTC-5, Paul Kinsky wrote:
>>
>> Let's assume that instead of exceptions each flow instead follows this
>> pattern: Flow[Request, Either[Error, Result], Unit]. You could build a
>> custom flow that routes errors to one output and results to the other, then
>> merge all the errors and results and send them to the sink. I've done this
>> in some code for work, if you're interested I can put together a sanitized
>> example.
>>
>> On Tuesday, November 10, 2015 at 7:23:44 PM UTC-8, Leon Ma wrote:
>>>
>>> Hi,
>>>
>>>
>>> Assuming I have below flow:
>>>
>>> Source(..).via(flow1).via(flow2).via(flow3)...via(flowN).toSink(..)
>>>
>>>
>>> I'd like to model the stream that any exception happens will lead the
>>> elements go to the very last flowN. (like a guardian fanalizer)
>>>
>>> For example, when processing elementA in flow2, we got an exception, it
>>> will just skip any subsequent flow and goes to flowN directly.
>>>
>>> How can I do this?
>>>
>>> Thanks
>>> Leon
>>>
>>>
>>>
>>>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.