[
https://issues.apache.org/jira/browse/BEAM-5638?focusedWorklogId=152412&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152412
]
ASF GitHub Bot logged work on BEAM-5638:
----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Oct/18 21:28
Start Date: 08/Oct/18 21:28
Worklog Time Spent: 10m
Work Description: reuvenlax commented on a change in pull request #6586:
[BEAM-5638] Exception handling for Java single message transforms
URL: https://github.com/apache/beam/pull/6586#discussion_r223502268
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Failure.java
##########
@@ -0,0 +1,101 @@
+package org.apache.beam.sdk.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ObjectArrays;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Wraps an exception along with an input value; this is the element type of
failure collections
+ * returned by single message transforms configured to catch exceptions.
+ *
+ * @param <T> type of the wrapped input value that caused an exception to be
raised
+ */
+@AutoValue
+public abstract class Failure<T> implements Serializable {
+ public static <T> Failure<T> of(Exception exception, T value) {
+ return new AutoValue_Failure<>(exception, value);
+ }
+
+ public abstract Exception exception();
+
+ public abstract T value();
+
+ /**
+ * Internal class for collecting tuple tags associated with collections of
{@link Exception}
+ * classes that should route to them. Also contains helper methods to
simplify implementation of
+ * the {@code WithFailures} nested classes of {@link MapElements}, {@link
FlatMapElements}, etc.
+ */
+ @AutoValue
+ abstract static class TaggedExceptionsList<T> implements Serializable {
+ abstract ImmutableList<TupleTag<Failure<T>>> tags();
+
+ abstract ImmutableList<List<Class<?>>> exceptionLists();
+
+ static <T> TaggedExceptionsList<T> empty() {
+ return new AutoValue_Failure_TaggedExceptionsList<>(ImmutableList.of(),
ImmutableList.of());
+ }
+
+ /**
+ * Return a new {@link TaggedExceptionsList} that has all the tags and
exceptions of this {@link
+ * TaggedExceptionsList} plus a new element representing the arguments
passed in here.
+ */
+ TaggedExceptionsList<T> and(
+ TupleTag<Failure<T>> tag, Class<?> exceptionToCatch, Class<?>[]
additionalExceptions) {
+ final ImmutableList<TupleTag<Failure<T>>> newTags =
+
ImmutableList.<TupleTag<Failure<T>>>builder().addAll(tags()).add(tag).build();
+ final ImmutableList<List<Class<?>>> newExceptionLists =
+ ImmutableList.<List<Class<?>>>builder()
+ .addAll(exceptionLists())
+ .add(
+ ImmutableList.copyOf(ObjectArrays.concat(exceptionToCatch,
additionalExceptions)))
+ .build();
+ return new AutoValue_Failure_TaggedExceptionsList<>(newTags,
newExceptionLists);
+ }
+
+ /** Return the internal typed list of tags as an untyped {@link
TupleTagList}. */
+ TupleTagList tupleTagList() {
+ TupleTagList l = TupleTagList.empty();
+ for (TupleTag<?> tag : tags()) {
+ l = l.and(tag);
+ }
+ return l;
+ }
+
+ /**
+ * Check the registered exception classes to see if the exception passed
in here matches. If it
+ * does, wrap the exception and value together in a {@link Failure} and
send to the output
+ * receiver. If not, rethrow so processing stops on the unexpected failure.
+ */
+ void outputOrRethrow(Exception e, T value, MultiOutputReceiver receiver)
throws Exception {
+ for (int i = 0; i < tags().size(); i++) {
+ for (Class<?> cls : exceptionLists().get(i)) {
+ if (cls.isInstance(e)) {
+ receiver.get(tags().get(i)).output(Failure.of(e, value));
+ return;
+ }
+ }
+ }
+ throw e;
+ }
+
+ /**
+ * Set appropriate coders on all the failure collections in the given
{@link PCollectionTuple}.
+ */
+ PCollectionTuple applyFailureCoders(PCollectionTuple pcs) {
+ final SerializableCoder<Failure<T>> failureCoder =
+ SerializableCoder.of(new TypeDescriptor<Failure<T>>() {});
Review comment:
BTW an onFailure that takes an OutputReceiver is not a horrible option. It's
also not mutually exclusive with the current approach.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 152412)
Time Spent: 3.5h (was: 3h 20m)
Remaining Estimate: 164.5h (was: 164h 40m)
> Add exception handling to single message transforms in Java SDK
> ---------------------------------------------------------------
>
> Key: BEAM-5638
> URL: https://issues.apache.org/jira/browse/BEAM-5638
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Jeff Klukas
> Assignee: Jeff Klukas
> Priority: Minor
> Original Estimate: 168h
> Time Spent: 3.5h
> Remaining Estimate: 164.5h
>
> Add methods to MapElements, FlatMapElements, and Filter that allow users to
> specify expected exceptions and tuple tags to associate with the with
> collections of the successfully and unsuccessfully processed elements.
> See discussion on dev list:
> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)