[
https://issues.apache.org/jira/browse/BEAM-5638?focusedWorklogId=200964&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-200964
]
ASF GitHub Bot logged work on BEAM-5638:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Feb/19 23:10
Start Date: 19/Feb/19 23:10
Worklog Time Spent: 10m
Work Description: reuvenlax commented on pull request #7736: [BEAM-5638]
Exception handling for Java MapElements and FlatMapElements
URL: https://github.com/apache/beam/pull/7736#discussion_r258254032
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
##########
@@ -156,4 +160,190 @@ public void populateDisplayData(DisplayData.Builder
builder) {
builder.include("fn", (HasDisplayData) originalFnForDisplayData);
}
}
+
+ /**
+ * Return a modified {@code PTransform} that catches exceptions raised while
mapping elements.
+ *
+ * <p>The user must call {@code via} on the returned {@link
MapWithExceptions} instance to define
+ * an exception handler. If the handler does not provide sufficient type
information, the user
+ * must also call {@code into} to define a type descriptor for the error
collection.
+ *
+ * <p>See {@link WithExceptions} documentation for usage patterns of the
returned {@link
+ * WithExceptions.Result}.
+ *
+ * @return a {@link WithExceptions.Result} wrapping the output and error
collections
+ */
+ @Experimental(Kind.WITH_EXCEPTIONS)
+ public MapWithExceptions<InputT, OutputT, ?> withExceptions() {
+ return new MapWithExceptions<>(fn, originalFnForDisplayData, inputType,
outputType, null, null);
+ }
+
+ /** Implementation of {@link MapElements#withExceptions()}. */
+ @Experimental(Kind.WITH_EXCEPTIONS)
+ public static class MapWithExceptions<InputT, OutputT, FailureT>
+ extends PTransform<
+ PCollection<InputT>, WithExceptions.Result<PCollection<OutputT>,
FailureT>> {
+
+ private final transient TypeDescriptor<InputT> inputType;
+ private final transient TypeDescriptor<OutputT> outputType;
+ @Nullable private final transient TypeDescriptor<FailureT> failureType;
+ private final transient Object originalFnForDisplayData;
+ private final Contextful<Fn<InputT, OutputT>> fn;
+ @Nullable private final ProcessFunction<ExceptionElement<InputT>,
FailureT> exceptionHandler;
+
+ MapWithExceptions(
+ Contextful<Fn<InputT, OutputT>> fn,
+ Object originalFnForDisplayData,
+ TypeDescriptor<InputT> inputType,
+ TypeDescriptor<OutputT> outputType,
+ @Nullable ProcessFunction<ExceptionElement<InputT>, FailureT>
exceptionHandler,
+ @Nullable TypeDescriptor<FailureT> failureType) {
+ this.fn = fn;
+ this.originalFnForDisplayData = originalFnForDisplayData;
+ this.inputType = inputType;
+ this.outputType = outputType;
+ this.exceptionHandler = exceptionHandler;
+ this.failureType = failureType;
+ }
+
+ /**
+ * Returns a new {@link MapWithExceptions} transform with the given type
descriptor for the
+ * error collection, but the exception handler yet to be specified using
{@link
+ * #via(ProcessFunction)}.
+ */
+ public <NewFailureT> MapWithExceptions<InputT, OutputT, NewFailureT> into(
+ TypeDescriptor<NewFailureT> failureTypeDescriptor) {
+ return new MapWithExceptions<>(
+ fn, originalFnForDisplayData, inputType, outputType, null,
failureTypeDescriptor);
+ }
+
+ /**
+ * Returns a {@code PTransform} that catches exceptions raised while
mapping elements, passing
+ * the raised exception instance and the input element being processed
through the given {@code
+ * exceptionHandler} and emitting the result to an error collection.
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * Result<PCollection<Integer>, String> result = words.apply(
+ * MapElements.into(TypeDescriptors.integers())
+ * .via((String word) -> 1 / word.length())
+ * .withExceptions()
+ * .into(TypeDescriptors.strings())
+ * .via(ee -> e.exception().getMessage()));
+ * PCollection<String> errors = result.errors();
+ * }</pre>
+ */
+ public MapWithExceptions<InputT, OutputT, FailureT> via(
+ ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler) {
+ return new MapWithExceptions<>(
+ fn, originalFnForDisplayData, inputType, outputType,
exceptionHandler, failureType);
+ }
+
+ /**
+ * Like {@link #via(ProcessFunction)}, but takes advantage of the type
information provided by
+ * {@link InferableFunction}, meaning that a call to {@link
#into(TypeDescriptor)} may not be
+ * necessary.
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * Result<PCollection<Integer>, KV<String, Map<String, String>>> result =
words.apply(
+ * MapElements.into(TypeDescriptors.integers())
+ * .via((String word) -> 1 / word.length())
+ * .withExceptions()
+ * .via(new WithExceptions.ExceptionAsMapHandler<String>()
{}));
+ * PCollection<KV<String, Map<String, String>>> errors = result.errors();
+ * }</pre>
+ */
+ public <NewFailureT> MapWithExceptions<InputT, OutputT, NewFailureT> via(
+ InferableFunction<ExceptionElement<InputT>, NewFailureT>
exceptionHandler) {
+ return new MapWithExceptions<>(
+ fn,
+ originalFnForDisplayData,
+ inputType,
+ outputType,
+ exceptionHandler,
+ exceptionHandler.getOutputTypeDescriptor());
+ }
+
+ @Override
+ public WithExceptions.Result<PCollection<OutputT>, FailureT>
expand(PCollection<InputT> input) {
+ final TupleTag<OutputT> outputTag = new TupleTag<OutputT>() {};
+ final TupleTag<FailureT> failureTag;
+ if (failureType == null) {
+ failureTag = new TupleTag<>();
+ } else {
+ failureTag =
+ new TupleTag<FailureT>() {
+ @Override
+ public TypeDescriptor<FailureT> getTypeDescriptor() {
+ return failureType;
+ }
+ };
+ }
+ DoFn<InputT, OutputT> doFn =
+ new DoFn<InputT, OutputT>() {
Review comment:
Same comment as before. Maybe we can create one DoFn used by both?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 200964)
Time Spent: 7.5h (was: 7h 20m)
Remaining Estimate: 160.5h (was: 160h 40m)
> Add exception handling to single message transforms in Java SDK
> ---------------------------------------------------------------
>
> Key: BEAM-5638
> URL: https://issues.apache.org/jira/browse/BEAM-5638
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Jeff Klukas
> Assignee: Jeff Klukas
> Priority: Minor
> Labels: triaged
> Original Estimate: 168h
> Time Spent: 7.5h
> Remaining Estimate: 160.5h
>
> Add methods to MapElements, FlatMapElements, and Filter that allow users to
> specify expected exceptions and tuple tags to associate with the with
> collections of the successfully and unsuccessfully processed elements.
> See discussion on dev list:
> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)