Sure, I was thinking of treating the apply as a promise (making use of your
CodableException idea as well):
```
PCollection<...> result = words.apply(new SomeUserDoFn())
.then(new SomeOtherDoFn())
.then(new OtherDoFn(),
// Error Handler
(CodableException<...> e) -> {
logger.info(e.getMessage());
return e;
});
```
The idea is to treat the pipeline with each apply as an asynchronous
operation where each step can either be "fulfilled" or "rejected". The
promises can then be chained together like above.
On Mon, Feb 11, 2019 at 1:47 PM Jeff Klukas <[email protected]> wrote:
> Vallery Lancey's post is definitely one of the viewpoints incorporated
> into this approach. I neglected to include that link in this iteration, but
> it was discussed in the previous thread.
>
> Can you explain more about "another option that adds A+ Promise spec into
> the apply method"? I'm failing to parse what that means.
>
> On Mon, Feb 11, 2019 at 4:23 PM Sam Rohde <[email protected]> wrote:
>
>> Interesting ideas! I think you're really honing in on what the Apache
>> Beam API is missing: error handling for bad data and runtime errors. I like
>> your method because it coalesces all the errors into a single collection to
>> be looked at later. Also easy to add a PAssert on the errors collection.
>>
>> Looks like others are also taking a stab at exception handling:
>> https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
>>
>> I would also like to add another option that adds A+ Promise spec into
>> the apply method. This makes exception handling more general than with only
>> the Map method.
>>
>> On Fri, Feb 8, 2019 at 9:53 AM Jeff Klukas <[email protected]> wrote:
>>
>>> I'm looking for feedback on a new attempt at implementing an exception
>>> handling interface for map transforms as previously discussed on this list
>>> [0] and documented in JIRA [1]. I'd like for users to be able to pass a
>>> function into MapElements, FlatMapElements, etc. that potentially raises an
>>> exception without having to resort to rolling a completely custom ParDo
>>> with an additional output for failing elements.
>>>
>>> I have a PR open for review [2] that introduces an exception-handling
>>> interface that mimics the existing `into` and `via` methods of MapElements:
>>>
>>> Result<PCollection<Integer>, String> result = words.apply(
>>> MapElements.into(TypeDescriptors.integers())
>>> .via((String word) -> 1 / word.length()) // throws
>>> ArithmeticException
>>> .withExceptions() // returns a MapWithFailures transform
>>> .into(TypeDescriptors.strings())
>>> .via(ee -> e.exception().getMessage()));
>>> PCollection<String> errors = result.errors();
>>> PCollection<Integer> output = result.output();
>>>
>>>
>>>
>>> The example above is a bit more complex than I'd like, but gives users
>>> full control over what type handled exceptions are transformed into. It
>>> would be nice if we could simply create an error collection of some type
>>> that wraps the input element and the Exception directly, but there is still
>>> no general solution for encoding subclasses of exception, thus the need for
>>> some exception handling function (which in this example is the lambda
>>> passed to the second `via`).
>>>
>>> Let's call the above option 1.
>>>
>>> If we expect that most users will simply want to preserve the input
>>> element that failed and know general metadata about the exception
>>> (className, message, and stackTrace), we could instead optimize for a
>>> default solution where we return an instance of a new
>>> CodableException[InputT] type that wraps the input element and has
>>> additional string fields for className, message, and stackTrace:
>>>
>>> Result<PCollection<Integer>, String> result = words.apply(
>>> MapElements.into(TypeDescriptors.integers())
>>> .via((String word) -> 1 / word.length())
>>> .withExceptions());
>>> PCollection<CodableException<Integer>> errors = result.errors();
>>> PCollection<Integer> output = result.output();
>>>
>>> Let's call this option 2.
>>>
>>> It's less user code compared to option 1 and returns a richer error
>>> collection. I believe we'd be able to handle setting an appropriate coder
>>> behind the scenes, setting some composite coder that reuses the coder for
>>> the input collection in order to encode the InputT instance.
>>>
>>> I think we'd still need to provide some additional methods, though, if
>>> the user wants to set a custom exception handling function and custom
>>> coder. That would be for needs where a user wants to catch only a
>>> particular subclass of exception, or access additional methods of Exception
>>> (to access getCause() perhaps) or methods particular to an Exception
>>> subclass. The full API would end up being more complex compared to option
>>> 1, but it does make the default case much easier to use.
>>>
>>> If it's not fairly obvious what's going on in either of the above
>>> examples, then we likely haven't figured out an appropriate API yet.
>>> Reviews on the PR or commentary on the above two options would be
>>> appreciated.
>>>
>>> [0]
>>> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E
>>> <https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E>
>>> [1] https://issues.apache.org/jira/browse/BEAM-5638
>>> [2] https://github.com/apache/beam/pull/7736
>>>
>>