[ 
https://issues.apache.org/jira/browse/BEAM-5638?focusedWorklogId=151820&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151820
 ]

ASF GitHub Bot logged work on BEAM-5638:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Oct/18 19:37
            Start Date: 05/Oct/18 19:37
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on a change in pull request #6586: 
[BEAM-5638] Exception handling for Java single message transforms
URL: https://github.com/apache/beam/pull/6586#discussion_r223118944
 
 

 ##########
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Failure.java
 ##########
 @@ -0,0 +1,101 @@
+package org.apache.beam.sdk.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ObjectArrays;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Wraps an exception along with an input value; this is the element type of 
failure collections
+ * returned by single message transforms configured to catch exceptions.
+ *
+ * @param <T> type of the wrapped input value that caused an exception to be 
raised
+ */
+@AutoValue
+public abstract class Failure<T> implements Serializable {
+  public static <T> Failure<T> of(Exception exception, T value) {
+    return new AutoValue_Failure<>(exception, value);
+  }
+
+  public abstract Exception exception();
+
+  public abstract T value();
+
+  /**
+   * Internal class for collecting tuple tags associated with collections of 
{@link Exception}
+   * classes that should route to them. Also contains helper methods to 
simplify implementation of
+   * the {@code WithFailures} nested classes of {@link MapElements}, {@link 
FlatMapElements}, etc.
+   */
+  @AutoValue
+  abstract static class TaggedExceptionsList<T> implements Serializable {
+    abstract ImmutableList<TupleTag<Failure<T>>> tags();
+
+    abstract ImmutableList<List<Class<?>>> exceptionLists();
+
+    static <T> TaggedExceptionsList<T> empty() {
+      return new AutoValue_Failure_TaggedExceptionsList<>(ImmutableList.of(), 
ImmutableList.of());
+    }
+
+    /**
+     * Return a new {@link TaggedExceptionsList} that has all the tags and 
exceptions of this {@link
+     * TaggedExceptionsList} plus a new element representing the arguments 
passed in here.
+     */
+    TaggedExceptionsList<T> and(
+        TupleTag<Failure<T>> tag, Class<?> exceptionToCatch, Class<?>[] 
additionalExceptions) {
+      final ImmutableList<TupleTag<Failure<T>>> newTags =
+          
ImmutableList.<TupleTag<Failure<T>>>builder().addAll(tags()).add(tag).build();
+      final ImmutableList<List<Class<?>>> newExceptionLists =
+          ImmutableList.<List<Class<?>>>builder()
+              .addAll(exceptionLists())
+              .add(
+                  ImmutableList.copyOf(ObjectArrays.concat(exceptionToCatch, 
additionalExceptions)))
+              .build();
+      return new AutoValue_Failure_TaggedExceptionsList<>(newTags, 
newExceptionLists);
+    }
+
+    /** Return the internal typed list of tags as an untyped {@link 
TupleTagList}. */
+    TupleTagList tupleTagList() {
+      TupleTagList l = TupleTagList.empty();
+      for (TupleTag<?> tag : tags()) {
+        l = l.and(tag);
+      }
+      return l;
+    }
+
+    /**
+     * Check the registered exception classes to see if the exception passed 
in here matches. If it
+     * does, wrap the exception and value together in a {@link Failure} and 
send to the output
+     * receiver. If not, rethrow so processing stops on the unexpected failure.
+     */
+    void outputOrRethrow(Exception e, T value, MultiOutputReceiver receiver) 
throws Exception {
+      for (int i = 0; i < tags().size(); i++) {
+        for (Class<?> cls : exceptionLists().get(i)) {
+          if (cls.isInstance(e)) {
+            receiver.get(tags().get(i)).output(Failure.of(e, value));
+            return;
+          }
+        }
+      }
+      throw e;
+    }
+
+    /**
+     * Set appropriate coders on all the failure collections in the given 
{@link PCollectionTuple}.
+     */
+    PCollectionTuple applyFailureCoders(PCollectionTuple pcs) {
+      final SerializableCoder<Failure<T>> failureCoder =
+          SerializableCoder.of(new TypeDescriptor<Failure<T>>() {});
 
 Review comment:
   Also keep in mind that T is not guaranteed to be Serializable, so 
SerializableCoder may not even work here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 151820)
            Time Spent: 1h 50m  (was: 1h 40m)
    Remaining Estimate: 166h 10m  (was: 166h 20m)

> Add exception handling to single message transforms in Java SDK
> ---------------------------------------------------------------
>
>                 Key: BEAM-5638
>                 URL: https://issues.apache.org/jira/browse/BEAM-5638
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Jeff Klukas
>            Assignee: Jeff Klukas
>            Priority: Minor
>   Original Estimate: 168h
>          Time Spent: 1h 50m
>  Remaining Estimate: 166h 10m
>
> Add methods to MapElements, FlatMapElements, and Filter that allow users to 
> specify expected exceptions and tuple tags to associate with the with 
> collections of the successfully and unsuccessfully processed elements.
> See discussion on dev list:
> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to