One thing to remember is that an upstream failure will be propagated downstream 
immediately without backpressure and thereby overtake previously emitted 
(buffered) elements, and transforming such an error to an element further 
downstream may result in unexpected order of elements.

Another thing is that such a failure will cancel upstream and that will be 
difficult to coordinate with a (later) downstream recovery.

It is sure possible to implement for a specific stage, but then it is perhaps 
confusing that it is only "catching" errors from the preceding stage.

This is just my 2c, so if you want a real assessment you are welcome to create 
a github issue.

/Patrik

> 20 maj 2015 kl. 18:24 skrev dpratt <[email protected]>:
> 
> Sorry - hit send too soon.
> 
> foo.recover {
>   case (NonFatal(e), failedValue) =>
>      log.error(e, "Problem processing stream value of {}", failedValue)
>      "UNKNOWN VALUE"
> }
> 
> 
>> On Wednesday, May 20, 2015 at 11:23:02 AM UTC-5, dpratt wrote:
>> What if I have an existing stage/Flow that I do not have control over, or 
>> where it would not make sense to conflate the flow logic with the exception 
>> handling?
>> 
>> For example
>> 
>> val foo: Flow[String, String, Unit] = 
>> SomeLibrary.somethingThatGeneratesAFlow()
>> 
>> how would I wrap foo with error handling? I can't use map or mapAsync, since 
>> those are compositional - namely, the value to map has already been 
>> calculated. What I really want is a recover block on the flow itself - 
>> something like
>> 
>> foo.recover {
>> 
>> }
>> 
>> 
>> 
>> 
>>> On Wednesday, May 20, 2015 at 4:37:37 AM UTC-5, Patrik Nordwall wrote:
>>> I think we considered adding this to the stream supervision mechanism, but 
>>> since it is not possible to express the types of the elements there in any 
>>> sane way we decided to not do it. Instead we said that this specific 
>>> recover scenario should be handled with try-catch within the function/stage 
>>> of yours. For mapAsync you can use recover on the Future.
>>> 
>>> By the way, you can define the supervision for individual stages by using 
>>> the withAttributes.
>>> 
>>> Regards,
>>> Patrik
>>> 
>>>> On Fri, May 15, 2015 at 7:50 PM, dpratt <[email protected]> wrote:
>>>> I've been using the Streams API to build a few things for the past couple 
>>>> months, and I have a humble suggestion for an API enhancement. I'm not 
>>>> sure if this is even possible to do given the contract of how a Flow 
>>>> operates, adding a method to FlowOps with the following signature would be 
>>>> quite useful - 
>>>> 
>>>> def recover(f: PartialFunction[(In, Throwable), Out]): Repr[Out, Mat]
>>>> 
>>>> It's likely due to the fact that I have yet to fully internalize the Flow 
>>>> API, but I've found that the supervision functionality isn't exactly what 
>>>> I need. On the top-level, it makes complete sense, but there is no way to 
>>>> deal with an error in a stream and not have at least one message silently 
>>>> dropped. It would be nice to be able to set up more fine-grained error 
>>>> handling. 
>>>> 
>>>> As an example, imagine a stream that was processing incoming deltas to a 
>>>> set of records held either in memory or some persistent data store. A 
>>>> failure of a given delta should not necessarily shut down the whole 
>>>> pipeline, but the associated record should be marked as inconsistent and 
>>>> dealt with appropriately. Using the current supervision API, there's no 
>>>> way to determine the actual element that caused the failure, and thus 
>>>> there's no real way to handle it or signal an external system with the 
>>>> details of the error.
>>>> 
>>>> Of course, you can work around this by making the stream operate on a 
>>>> Try[T] instead of T, but that just seems unwieldy. 
>>>> 
>>>> Am I looking at this the wrong way?
>>>> -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: 
>>>> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google Groups 
>>>> "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send an 
>>>> email to [email protected].
>>>> To post to this group, send email to [email protected].
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>> 
>>> 
>>> 
>>> -- 
>>> Patrik Nordwall
>>> Typesafe -  Reactive apps on the JVM
>>> Twitter: @patriknw
>>> 
> 
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to