[ 
https://issues.apache.org/jira/browse/BEAM-5638?focusedWorklogId=201026&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-201026
 ]

ASF GitHub Bot logged work on BEAM-5638:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Feb/19 01:47
            Start Date: 20/Feb/19 01:47
    Worklog Time Spent: 10m 
      Work Description: jklukas commented on pull request #7736: [BEAM-5638] 
Exception handling for Java MapElements and FlatMapElements
URL: https://github.com/apache/beam/pull/7736#discussion_r258304373
 
 

 ##########
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
 ##########
 @@ -170,4 +174,193 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       builder.include("fn", (HasDisplayData) originalFnForDisplayData);
     }
   }
+
+  /**
+   * Return a modified {@code PTransform} that catches exceptions raised while 
mapping elements.
+   *
+   * <p>The user must call {@code via} on the returned {@link 
FlatMapWithExceptions} instance to
+   * define an exception handler. If the handler does not provide sufficient 
type information, the
+   * user must also call {@code into} to define a type descriptor for the 
error collection.
+   *
+   * <p>See {@link WithExceptions} documentation for usage patterns of the 
returned {@link
+   * WithExceptions.Result}.
+   *
+   * @return a {@link WithExceptions.Result} wrapping the output and error 
collections
+   */
+  @Experimental(Experimental.Kind.WITH_EXCEPTIONS)
+  public FlatMapWithExceptions<InputT, OutputT, ?> withExceptions() {
+    return new FlatMapWithExceptions<>(
+        fn, originalFnForDisplayData, inputType, outputType, null, null);
+  }
+
+  /** Implementation of {@link FlatMapElements#withExceptions()}. */
+  @Experimental(Experimental.Kind.WITH_EXCEPTIONS)
+  public static class FlatMapWithExceptions<InputT, OutputT, FailureT>
+      extends PTransform<
+          PCollection<InputT>, WithExceptions.Result<PCollection<OutputT>, 
FailureT>> {
+
+    private final transient TypeDescriptor<InputT> inputType;
+    private final transient TypeDescriptor<OutputT> outputType;
+    @Nullable private final transient TypeDescriptor<FailureT> failureType;
+    private final transient Object originalFnForDisplayData;
+    @Nullable private final Contextful<Fn<InputT, Iterable<OutputT>>> fn;
+    @Nullable private final ProcessFunction<ExceptionElement<InputT>, 
FailureT> exceptionHandler;
+
+    FlatMapWithExceptions(
+        @Nullable Contextful<Fn<InputT, Iterable<OutputT>>> fn,
+        Object originalFnForDisplayData,
+        TypeDescriptor<InputT> inputType,
+        TypeDescriptor<OutputT> outputType,
+        @Nullable ProcessFunction<ExceptionElement<InputT>, FailureT> 
exceptionHandler,
+        @Nullable TypeDescriptor<FailureT> failureType) {
+      this.fn = fn;
+      this.originalFnForDisplayData = originalFnForDisplayData;
+      this.inputType = inputType;
+      this.outputType = outputType;
+      this.exceptionHandler = exceptionHandler;
+      this.failureType = failureType;
+    }
+
+    /**
+     * Returns a new {@link FlatMapWithExceptions} transform with the given 
type descriptor for the
+     * error collection, but the exception handler yet to be specified using 
{@link
+     * #via(ProcessFunction)}.
+     */
+    public <NewFailureT> FlatMapWithExceptions<InputT, OutputT, NewFailureT> 
into(
+        TypeDescriptor<NewFailureT> failureTypeDescriptor) {
+      return new FlatMapWithExceptions<>(
+          fn, originalFnForDisplayData, inputType, outputType, null, 
failureTypeDescriptor);
+    }
+
+    /**
+     * Returns a {@code PTransform} that catches exceptions raised while 
mapping elements, passing
+     * the raised exception instance and the input element being processed 
through the given {@code
+     * exceptionHandler} and emitting the result to an error collection.
+     *
+     * <p>Example usage:
+     *
+     * <pre>{@code
+     * Result<PCollection<String>, String>> result = words.apply(
+     *     FlatMapElements.into(TypeDescriptors.strings())
+     *         .via((String line) -> 
Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
+     *         .withExceptions()
+     *         .into(TypeDescriptors.strings())
+     *         .via(ee -> e.exception().getMessage()));
+     * PCollection<String> errors = result.errors();
+     * }</pre>
+     */
+    public FlatMapWithExceptions<InputT, OutputT, FailureT> via(
+        ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler) {
+      return new FlatMapWithExceptions<>(
+          fn, originalFnForDisplayData, inputType, outputType, 
exceptionHandler, failureType);
+    }
+
+    /**
+     * Like {@link #via(ProcessFunction)}, but takes advantage of the type 
information provided by
+     * {@link InferableFunction}, meaning that a call to {@link 
#into(TypeDescriptor)} may not be
+     * necessary.
+     *
+     * <p>Example usage:
+     *
+     * <pre>{@code
+     * Result<PCollection<Integer>, KV<String, Map<String, String>>> result = 
words.apply(
+     *     FlatMapElements.into(TypeDescriptors.strings())
+     *         .via((String line) -> 
Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
+     *         .withExceptions()
+     *         .via(new WithExceptions.ExceptionAsMapHandler<String>() {}));
+     * PCollection<KV<String, Map<String, String>>> errors = result.errors();
+     * }</pre>
+     */
+    public <NewFailureT> FlatMapWithExceptions<InputT, OutputT, NewFailureT> 
via(
+        InferableFunction<ExceptionElement<InputT>, NewFailureT> 
exceptionHandler) {
+      return new FlatMapWithExceptions<>(
+          fn,
+          originalFnForDisplayData,
+          inputType,
+          outputType,
+          exceptionHandler,
+          exceptionHandler.getOutputTypeDescriptor());
+    }
+
+    @Override
+    public WithExceptions.Result<PCollection<OutputT>, FailureT> 
expand(PCollection<InputT> input) {
+      final TupleTag<OutputT> outputTag = new TupleTag<OutputT>() {};
+      final TupleTag<FailureT> failureTag;
+      if (failureType == null) {
+        failureTag = new TupleTag<>();
+      } else {
+        failureTag =
+            new TupleTag<FailureT>() {
+              @Override
+              public TypeDescriptor<FailureT> getTypeDescriptor() {
+                return failureType;
+              }
+            };
+      }
+      DoFn<InputT, OutputT> doFn =
+          new DoFn<InputT, OutputT>() {
+            @ProcessElement
+            public void processElement(
+                @Element InputT element, MultiOutputReceiver receiver, 
ProcessContext c)
+                throws Exception {
+              boolean exceptionWasThrown = false;
+              Iterable<OutputT> res = null;
+              try {
+                res = fn.getClosure().apply(c.element(), 
Fn.Context.wrapProcessContext(c));
+              } catch (Exception e) {
+                exceptionWasThrown = true;
+                ExceptionElement<InputT> exceptionElement = 
ExceptionElement.of(element, e);
+                
receiver.get(failureTag).output(exceptionHandler.apply(exceptionElement));
+              }
+              if (!exceptionWasThrown) {
+                for (OutputT output : res) {
 
 Review comment:
   I'm hesitant to put anything into the `try` except for the single line where 
we apply `fn`. I have similar code to this running in a Dataflow pipeline, and 
I originally sent elements to the main output receiver within the `try` block, 
but I saw instances where some elements were appearing in both the output and 
error collections. When I factored the receiver code out of the try block, I 
saw pipeline exceptions bubbling all the way up.
   
   I don't have a full understanding of what was happening, but I think it may 
be related to fusion; essentially, I think the try block was catching an 
exception raised by code from an adjacent transform. I don't know anything 
about how fusion is implemented, but is it possible that calling `.output` on a 
receiver is actually calling code from a fused transform?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 201026)
            Time Spent: 7h 50m  (was: 7h 40m)
    Remaining Estimate: 160h 10m  (was: 160h 20m)

> Add exception handling to single message transforms in Java SDK
> ---------------------------------------------------------------
>
>                 Key: BEAM-5638
>                 URL: https://issues.apache.org/jira/browse/BEAM-5638
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Jeff Klukas
>            Assignee: Jeff Klukas
>            Priority: Minor
>              Labels: triaged
>   Original Estimate: 168h
>          Time Spent: 7h 50m
>  Remaining Estimate: 160h 10m
>
> Add methods to MapElements, FlatMapElements, and Filter that allow users to 
> specify expected exceptions and tuple tags to associate with the with 
> collections of the successfully and unsuccessfully processed elements.
> See discussion on dev list:
> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to