[ 
https://issues.apache.org/jira/browse/BEAM-5638?focusedWorklogId=200967&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-200967
 ]

ASF GitHub Bot logged work on BEAM-5638:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Feb/19 23:10
            Start Date: 19/Feb/19 23:10
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on pull request #7736: [BEAM-5638] 
Exception handling for Java MapElements and FlatMapElements
URL: https://github.com/apache/beam/pull/7736#discussion_r258251871
 
 

 ##########
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
 ##########
 @@ -170,4 +174,193 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       builder.include("fn", (HasDisplayData) originalFnForDisplayData);
     }
   }
+
+  /**
+   * Return a modified {@code PTransform} that catches exceptions raised while 
mapping elements.
+   *
+   * <p>The user must call {@code via} on the returned {@link 
FlatMapWithExceptions} instance to
+   * define an exception handler. If the handler does not provide sufficient 
type information, the
+   * user must also call {@code into} to define a type descriptor for the 
error collection.
+   *
+   * <p>See {@link WithExceptions} documentation for usage patterns of the 
returned {@link
+   * WithExceptions.Result}.
+   *
+   * @return a {@link WithExceptions.Result} wrapping the output and error 
collections
+   */
+  @Experimental(Experimental.Kind.WITH_EXCEPTIONS)
+  public FlatMapWithExceptions<InputT, OutputT, ?> withExceptions() {
+    return new FlatMapWithExceptions<>(
+        fn, originalFnForDisplayData, inputType, outputType, null, null);
+  }
+
+  /** Implementation of {@link FlatMapElements#withExceptions()}. */
+  @Experimental(Experimental.Kind.WITH_EXCEPTIONS)
+  public static class FlatMapWithExceptions<InputT, OutputT, FailureT>
+      extends PTransform<
+          PCollection<InputT>, WithExceptions.Result<PCollection<OutputT>, 
FailureT>> {
+
+    private final transient TypeDescriptor<InputT> inputType;
+    private final transient TypeDescriptor<OutputT> outputType;
+    @Nullable private final transient TypeDescriptor<FailureT> failureType;
+    private final transient Object originalFnForDisplayData;
+    @Nullable private final Contextful<Fn<InputT, Iterable<OutputT>>> fn;
+    @Nullable private final ProcessFunction<ExceptionElement<InputT>, 
FailureT> exceptionHandler;
+
+    FlatMapWithExceptions(
+        @Nullable Contextful<Fn<InputT, Iterable<OutputT>>> fn,
+        Object originalFnForDisplayData,
+        TypeDescriptor<InputT> inputType,
+        TypeDescriptor<OutputT> outputType,
+        @Nullable ProcessFunction<ExceptionElement<InputT>, FailureT> 
exceptionHandler,
+        @Nullable TypeDescriptor<FailureT> failureType) {
+      this.fn = fn;
+      this.originalFnForDisplayData = originalFnForDisplayData;
+      this.inputType = inputType;
+      this.outputType = outputType;
+      this.exceptionHandler = exceptionHandler;
+      this.failureType = failureType;
+    }
+
+    /**
+     * Returns a new {@link FlatMapWithExceptions} transform with the given 
type descriptor for the
+     * error collection, but the exception handler yet to be specified using 
{@link
+     * #via(ProcessFunction)}.
+     */
+    public <NewFailureT> FlatMapWithExceptions<InputT, OutputT, NewFailureT> 
into(
+        TypeDescriptor<NewFailureT> failureTypeDescriptor) {
+      return new FlatMapWithExceptions<>(
+          fn, originalFnForDisplayData, inputType, outputType, null, 
failureTypeDescriptor);
+    }
+
+    /**
+     * Returns a {@code PTransform} that catches exceptions raised while 
mapping elements, passing
+     * the raised exception instance and the input element being processed 
through the given {@code
+     * exceptionHandler} and emitting the result to an error collection.
+     *
+     * <p>Example usage:
+     *
+     * <pre>{@code
+     * Result<PCollection<String>, String>> result = words.apply(
+     *     FlatMapElements.into(TypeDescriptors.strings())
+     *         .via((String line) -> 
Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
+     *         .withExceptions()
+     *         .into(TypeDescriptors.strings())
+     *         .via(ee -> e.exception().getMessage()));
+     * PCollection<String> errors = result.errors();
+     * }</pre>
+     */
+    public FlatMapWithExceptions<InputT, OutputT, FailureT> via(
+        ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler) {
+      return new FlatMapWithExceptions<>(
+          fn, originalFnForDisplayData, inputType, outputType, 
exceptionHandler, failureType);
+    }
+
+    /**
+     * Like {@link #via(ProcessFunction)}, but takes advantage of the type 
information provided by
+     * {@link InferableFunction}, meaning that a call to {@link 
#into(TypeDescriptor)} may not be
+     * necessary.
+     *
+     * <p>Example usage:
+     *
+     * <pre>{@code
+     * Result<PCollection<Integer>, KV<String, Map<String, String>>> result = 
words.apply(
+     *     FlatMapElements.into(TypeDescriptors.strings())
+     *         .via((String line) -> 
Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
+     *         .withExceptions()
+     *         .via(new WithExceptions.ExceptionAsMapHandler<String>() {}));
+     * PCollection<KV<String, Map<String, String>>> errors = result.errors();
+     * }</pre>
+     */
+    public <NewFailureT> FlatMapWithExceptions<InputT, OutputT, NewFailureT> 
via(
+        InferableFunction<ExceptionElement<InputT>, NewFailureT> 
exceptionHandler) {
+      return new FlatMapWithExceptions<>(
+          fn,
+          originalFnForDisplayData,
+          inputType,
+          outputType,
+          exceptionHandler,
+          exceptionHandler.getOutputTypeDescriptor());
+    }
+
+    @Override
+    public WithExceptions.Result<PCollection<OutputT>, FailureT> 
expand(PCollection<InputT> input) {
+      final TupleTag<OutputT> outputTag = new TupleTag<OutputT>() {};
+      final TupleTag<FailureT> failureTag;
+      if (failureType == null) {
+        failureTag = new TupleTag<>();
+      } else {
+        failureTag =
+            new TupleTag<FailureT>() {
+              @Override
+              public TypeDescriptor<FailureT> getTypeDescriptor() {
+                return failureType;
+              }
+            };
+      }
+      DoFn<InputT, OutputT> doFn =
+          new DoFn<InputT, OutputT>() {
 
 Review comment:
   This is large enough that maybe we're better pulling it into a static class?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 200967)
            Time Spent: 7h 40m  (was: 7.5h)
    Remaining Estimate: 160h 20m  (was: 160.5h)

> Add exception handling to single message transforms in Java SDK
> ---------------------------------------------------------------
>
>                 Key: BEAM-5638
>                 URL: https://issues.apache.org/jira/browse/BEAM-5638
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Jeff Klukas
>            Assignee: Jeff Klukas
>            Priority: Minor
>              Labels: triaged
>   Original Estimate: 168h
>          Time Spent: 7h 40m
>  Remaining Estimate: 160h 20m
>
> Add methods to MapElements, FlatMapElements, and Filter that allow users to 
> specify expected exceptions and tuple tags to associate with the with 
> collections of the successfully and unsuccessfully processed elements.
> See discussion on dev list:
> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to