Interesting ideas! I think you're really honing in on what the Apache Beam
API is missing: error handling for bad data and runtime errors. I like your
method because it coalesces all the errors into a single collection to be
looked at later. Also easy to add a PAssert on the errors collection.

Looks like others are also taking a stab at exception handling:
https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a

I would also like to add another option that adds A+ Promise spec into the
apply method. This makes exception handling more general than with only the
Map method.

On Fri, Feb 8, 2019 at 9:53 AM Jeff Klukas <[email protected]> wrote:

> I'm looking for feedback on a new attempt at implementing an exception
> handling interface for map transforms as previously discussed on this list
> [0] and documented in JIRA [1]. I'd like for users to be able to pass a
> function into MapElements, FlatMapElements, etc. that potentially raises an
> exception without having to resort to rolling a completely custom ParDo
> with an additional output for failing elements.
>
> I have a PR open for review [2] that introduces an exception-handling
> interface that mimics the existing `into` and `via` methods of MapElements:
>
>  Result<PCollection<Integer>, String> result = words.apply(
>      MapElements.into(TypeDescriptors.integers())
>                 .via((String word) -> 1 / word.length()) // throws
> ArithmeticException
>                 .withExceptions() // returns a MapWithFailures transform
>                 .into(TypeDescriptors.strings())
>                 .via(ee -> e.exception().getMessage()));
>  PCollection<String> errors = result.errors();
>  PCollection<Integer> output = result.output();
>
>
>
> The example above is a bit more complex than I'd like, but gives users
> full control over what type handled exceptions are transformed into. It
> would be nice if we could simply create an error collection of some type
> that wraps the input element and the Exception directly, but there is still
> no general solution for encoding subclasses of exception, thus the need for
> some exception handling function (which in this example is the lambda
> passed to the second `via`).
>
> Let's call the above option 1.
>
> If we expect that most users will simply want to preserve the input
> element that failed and know general metadata about the exception
> (className, message, and stackTrace), we could instead optimize for a
> default solution where we return an instance of a new
> CodableException[InputT] type that wraps the input element and has
> additional string fields for className, message, and stackTrace:
>
>  Result<PCollection<Integer>, String> result = words.apply(
>      MapElements.into(TypeDescriptors.integers())
>                 .via((String word) -> 1 / word.length())
>                 .withExceptions());
>  PCollection<CodableException<Integer>> errors = result.errors();
>  PCollection<Integer> output = result.output();
>
> Let's call this option 2.
>
> It's less user code compared to option 1 and returns a richer error
> collection. I believe we'd be able to handle setting an appropriate coder
> behind the scenes, setting some composite coder that reuses the coder for
> the input collection in order to encode the InputT instance.
>
> I think we'd still need to provide some additional methods, though, if the
> user wants to set a custom exception handling function and custom coder.
> That would be for needs where a user wants to catch only a particular
> subclass of exception, or access additional methods of Exception (to access
> getCause() perhaps) or methods particular to an Exception subclass. The
> full API would end up being more complex compared to option 1, but it does
> make the default case much easier to use.
>
> If it's not fairly obvious what's going on in either of the above
> examples, then we likely haven't figured out an appropriate API yet.
> Reviews on the PR or commentary on the above two options would be
> appreciated.
>
> [0]
> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E
> <https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E>
> [1] https://issues.apache.org/jira/browse/BEAM-5638
> [2] https://github.com/apache/beam/pull/7736
>

Reply via email to