[
https://issues.apache.org/jira/browse/BEAM-5638?focusedWorklogId=160774&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-160774
]
ASF GitHub Bot logged work on BEAM-5638:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Oct/18 18:54
Start Date: 30/Oct/18 18:54
Worklog Time Spent: 10m
Work Description: jklukas closed pull request #6586: [BEAM-5638]
Exception handling for Java single message transforms
URL: https://github.com/apache/beam/pull/6586
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Failure.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Failure.java
new file mode 100644
index 00000000000..2a1d291889f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Failure.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.ParameterizedType;
+import java.util.List;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DelegateCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Wraps an exception along with an input value; this is the element type of
failure collections
+ * returned by single message transforms configured to catch exceptions.
+ *
+ * @param <InputT> type of the wrapped input value that caused an exception to
be raised
+ */
+@AutoValue
+public abstract class Failure<ExceptionT extends Exception, InputT> implements
Serializable {
+ public static <ExceptionT extends Exception, InputT> Failure<ExceptionT,
InputT> of(
+ ExceptionT exception, InputT value) {
+ return new AutoValue_Failure<>(exception, value);
+ }
+
+ public abstract ExceptionT exception();
+
+ public abstract InputT value();
+
+ /** Provides factory {@link #of(Coder, Coder)} for creating coders for
{@link Failure}. */
+ public static class FailureCoder {
+
+ public static <ExceptionT extends Exception, InputT>
Coder<Failure<ExceptionT, InputT>> of(
+ Coder<ExceptionT> exceptionCoder, Coder<InputT> valueCoder) {
+ return DelegateCoder.of(
+ KvCoder.of(exceptionCoder, valueCoder),
+ new ToKvCodingFunction<>(),
+ new ToFailureCodingFunction<>());
+ }
+
+ private static final class ToKvCodingFunction<K extends Exception, V>
+ implements DelegateCoder.CodingFunction<Failure<K, V>, KV<K, V>> {
+ @Override
+ public KV<K, V> apply(Failure<K, V> failure) throws Exception {
+ return KV.of(failure.exception(), failure.value());
+ }
+ }
+
+ private static final class ToFailureCodingFunction<K extends Exception, V>
+ implements DelegateCoder.CodingFunction<KV<K, V>, Failure<K, V>> {
+ @Override
+ public Failure<K, V> apply(KV<K, V> kv) throws Exception {
+ return Failure.of(kv.getKey(), kv.getValue());
+ }
+ }
+ }
+
+ /**
+ * Internal class for storing a {@code TupleTag<Failure<ExceptionT,
InputT>>} and providing the
+ * necessary utilities for reflecting the exception type and generating an
appropriate coder.
+ *
+ * <p>Any action that needs to know about the concrete types ExceptionT or
InputT lives here.
+ */
+ @AutoValue
+ abstract static class FailureTag<ExceptionT extends Exception, InputT>
implements Serializable {
+ abstract TupleTag<Failure<ExceptionT, InputT>> tupleTag();
+
+ abstract List<Class<?>> exceptionList();
+
+ abstract Coder<ExceptionT> exceptionCoder();
+
+ static <ExceptionT extends Exception, InputT> FailureTag<ExceptionT,
InputT> of(
+ TupleTag<Failure<ExceptionT, InputT>> tag, Class<?>[]
exceptionsToCatch) {
+ final TypeDescriptor excType = getExceptionTypeDescriptor(tag);
+
+ List<Class<?>> exceptionList;
+ if (exceptionsToCatch.length > 0) {
+ for (Class<?> cls : exceptionsToCatch) {
+ if (!excType.getRawType().isAssignableFrom(cls)) {
+ throw new IllegalArgumentException(
+ "All passed exception classes must be subtypes"
+ + " of type parameter ExceptionT in the given"
+ + " TupleTag<Failure<ExceptionT, InputT>>, but "
+ + cls.toString()
+ + " is not a subtype of "
+ + excType.toString());
+ }
+ }
+ exceptionList = ImmutableList.copyOf(exceptionsToCatch);
+ } else {
+ exceptionList = ImmutableList.of(excType.getRawType());
+ }
+
+ Coder<ExceptionT> exceptionCoder = AvroCoder.of(excType);
+
+ // Exercise the coder early so that we can fail fast in case the given
exception type is
+ // not valid for AvroCoder.
+ try {
+ exceptionCoder.decode(null);
+ } catch (IOException | NullPointerException e) {
+ // We expect NullPointerException here due to null InputStream passed
to decode();
+ // if we get this far, our type is good and we can move on.
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof NoSuchMethodException) {
+ throw new IllegalArgumentException(
+ "Cannot transcode instances of "
+ + excType.toString()
+ + " with AvroCoder; you may want to choose ExceptionT in
TupleTag<Failure<ExceptionT, InputT>> to be a "
+ + " superclass that has a no-argument constructor",
+ e);
+ }
+ throw e;
+ }
+ return new AutoValue_Failure_FailureTag<>(tag, exceptionList,
exceptionCoder);
+ }
+
+ /** Return true if the passed exception is an instance of one of the
configured subclasses. */
+ boolean matches(Exception e) {
+ for (Class<?> cls : exceptionList()) {
+ if (cls.isInstance(e)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void sendToOutput(Exception e, InputT value, MultiOutputReceiver receiver)
{
+ receiver.get(tupleTag()).output(Failure.of((ExceptionT) e, value));
+ }
+
+ void applyFailureCoder(Coder<InputT> valueCoder, PCollectionTuple pcs) {
+ pcs.get(tupleTag()).setCoder(FailureCoder.of(exceptionCoder(),
valueCoder));
+ }
+
+ static <ExceptionT extends Exception, InputT> TypeDescriptor
getExceptionTypeDescriptor(
+ TupleTag<Failure<ExceptionT, InputT>> tag) {
+ ParameterizedType tagType =
+ (ParameterizedType)
+
TypeDescriptor.of(tag.getClass()).getSupertype(TupleTag.class).getType();
+ ParameterizedType failureType;
+ try {
+ failureType =
+ (ParameterizedType)
TypeDescriptor.of(tagType.getActualTypeArguments()[0]).getType();
+ } catch (ClassCastException e) {
+ throw new IllegalArgumentException(
+ "Could not reflect type parameters from the passed"
+ + " TupleTag<Failure<ExceptionT, InputT>>, probably because
the tag was instantiated"
+ + " without ending curly braces; the instantiation should look
something like"
+ + " `new TupleTag<Failure<IOException, String>>(){}`.",
+ e);
+ }
+ return TypeDescriptor.of(failureType.getActualTypeArguments()[0]);
+ }
+ }
+
+ /**
+ * Internal class for collecting tuple tags associated with collections of
{@link Exception}
+ * classes that should route to them. Also contains helper methods to
simplify implementation of
+ * the {@code WithFailures} nested classes of {@link MapElements}, {@link
FlatMapElements}, etc.
+ */
+ @AutoValue
+ abstract static class FailureTagList<InputT> implements Serializable {
+ abstract List<FailureTag<?, InputT>> failureTags();
+
+ static <T> FailureTagList<T> empty() {
+ return new AutoValue_Failure_FailureTagList<>(ImmutableList.of());
+ }
+
+ /**
+ * Return a new {@link FailureTagList} that has all the tags and
exceptions of this {@link
+ * FailureTagList} plus a new element representing the arguments passed in
here.
+ */
+ <ExceptionT extends Exception> FailureTagList<InputT> and(
+ TupleTag<Failure<ExceptionT, InputT>> tag, Class<?>[]
exceptionsToCatch) {
+ final ImmutableList<FailureTag<?, InputT>> newFailureTags =
+ ImmutableList.<FailureTag<?, InputT>>builder()
+ .addAll(failureTags())
+ .add(FailureTag.of(tag, exceptionsToCatch))
+ .build();
+ return new AutoValue_Failure_FailureTagList<>(newFailureTags);
+ }
+
+ /** Return the internal typed list of tags as an untyped {@link
TupleTagList}. */
+ TupleTagList tags() {
+ TupleTagList l = TupleTagList.empty();
+ for (FailureTag<?, InputT> failureTag : failureTags()) {
+ l = l.and(failureTag.tupleTag());
+ }
+ return l;
+ }
+
+ /**
+ * Check the registered exception classes to see if the exception passed
in here matches. If it
+ * does, wrap the exception and value together in a {@link Failure} and
send to the output
+ * receiver. If not, rethrow so processing stops on the unexpected failure.
+ */
+ void outputOrRethrow(Exception e, InputT value, MultiOutputReceiver
receiver) throws Exception {
+ for (FailureTag<?, InputT> failureTag : failureTags()) {
+ if (failureTag.matches(e)) {
+ failureTag.sendToOutput(e, value, receiver);
+ return;
+ }
+ }
+ throw e;
+ }
+
+ /**
+ * Set appropriate coders on all the failure collections in the given
{@link PCollectionTuple}.
+ */
+ PCollectionTuple applyFailureCoders(Coder<InputT> valueCoder,
PCollectionTuple pcs) {
+ for (FailureTag<?, InputT> failureTag : failureTags()) {
+ failureTag.applyFailureCoder(valueCoder, pcs);
+ }
+ return pcs;
+ }
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 4bffeb6be3d..599807ba058 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -17,8 +17,11 @@
*/
package org.apache.beam.sdk.transforms;
+import org.apache.beam.sdk.transforms.Failure.FailureTagList;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
/**
* {@code PTransform}s for filtering from a {@code PCollection} the elements
satisfying a predicate,
@@ -213,4 +216,73 @@ public void populateDisplayData(DisplayData.Builder
builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("predicate",
predicateDescription).withLabel("Filter Predicate"));
}
+
+ /**
+ * Sets a {@link TupleTag} to associate with successes, converting this
{@link PTransform} into
+ * one that returns a {@link PCollectionTuple}. This allows you to make
subsequent {@link
+ * WithFailures#withFailureTag(TupleTag, Class[])} calls to capture thrown
exceptions to failure
+ * collections.
+ */
+ public WithFailures withSuccessTag(TupleTag<T> successTag) {
+ return new WithFailures(successTag, FailureTagList.empty());
+ }
+
+ /**
+ * Variant of {@link Filter} that that can handle exceptions and output one
or more failure
+ * collections wrapped in a {@link PCollectionTuple}. Specify how to handle
exceptions by calling
+ * {@link #withFailureTag(TupleTag, Class[])}.
+ */
+ public class WithFailures extends PTransform<PCollection<T>,
PCollectionTuple> {
+ private final TupleTag<T> successTag;
+ private final FailureTagList<T> failureTagList;
+
+ WithFailures(TupleTag<T> successTag, FailureTagList<T> failureTagList) {
+ this.successTag = successTag;
+ this.failureTagList = failureTagList;
+ }
+
+ /**
+ * Returns a modified {@link PTransform} that will catch exceptions of
type {@code ExceptionT}
+ * (as specified by the given {@code TupleTag<Failure<ExceptionT,
InputT>>}).
+ *
+ * <p>If you only want to catch specific subtypes of {@code ExceptionT},
you may pass class
+ * instances for those narrower types as additional parameters.
+ */
+ public <ExceptionT extends Exception> WithFailures withFailureTag(
+ TupleTag<Failure<ExceptionT, T>> tag, Class... exceptionsToCatch) {
+ return new WithFailures(successTag, failureTagList.and(tag,
exceptionsToCatch));
+ }
+
+ @Override
+ public PCollectionTuple expand(PCollection<T> input) {
+ PCollectionTuple pcs =
+ input.apply(
+ ParDo.of(
+ new DoFn<T, T>() {
+ @ProcessElement
+ public void processElement(@Element T element,
MultiOutputReceiver receiver)
+ throws Exception {
+ Boolean accepted = null;
+ try {
+ accepted = predicate.apply(element);
+ } catch (Exception e) {
+ failureTagList.outputOrRethrow(e, element,
receiver);
+ }
+ if (accepted != null && accepted) {
+ receiver.get(successTag).output(element);
+ }
+ }
+ })
+ .withOutputTags(successTag, failureTagList.tags()));
+ pcs.get(successTag).setCoder(input.getCoder());
+ return failureTagList.applyFailureCoders(input.getCoder(), pcs);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(
+ DisplayData.item("predicate",
predicateDescription).withLabel("Filter Predicate"));
+ }
+ }
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index edf255affb6..1b404e0cef1 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -17,15 +17,19 @@
*/
package org.apache.beam.sdk.transforms;
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.Contextful.Fn;
+import org.apache.beam.sdk.transforms.Failure.FailureTagList;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
@@ -125,9 +129,19 @@ private FlatMapElements(
fn, fn.getClosure(), TypeDescriptors.inputOf(fn.getClosure()),
outputType);
}
+ private void checkOutputType() {
+ checkState(
+ outputType != null,
+ "%s output type descriptor was null; "
+ + "this probably means that getOutputTypeDescriptor() was called
after "
+ + "serialization/deserialization, but it is only available prior
to "
+ + "serialization, for constructing a pipeline and inferring
coders",
+ FlatMapElements.class.getSimpleName());
+ }
+
@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
- checkArgument(fn != null, ".via() is required");
+ checkNotNull(fn, ".via() is required");
return input.apply(
"FlatMap",
ParDo.of(
@@ -148,13 +162,7 @@ public void processElement(ProcessContext c) throws
Exception {
@Override
public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- checkState(
- outputType != null,
- "%s output type descriptor was null; "
- + "this probably means that
getOutputTypeDescriptor() was called after "
- + "serialization/deserialization, but it is only
available prior to "
- + "serialization, for constructing a pipeline and
inferring coders",
- FlatMapElements.class.getSimpleName());
+ checkOutputType();
return outputType;
}
@@ -174,4 +182,99 @@ public void populateDisplayData(DisplayData.Builder
builder) {
builder.include("fn", (HasDisplayData) originalFnForDisplayData);
}
}
+
+ /**
+ * Sets a {@link TupleTag} to associate with successes, converting this
{@link PTransform} into
+ * one that returns a {@link PCollectionTuple}. This allows you to make
subsequent {@link
+ * WithFailures#withFailureTag(TupleTag, Class[])} calls to capture thrown
exceptions to failure
+ * collections.
+ */
+ public WithFailures withSuccessTag(TupleTag<OutputT> successTag) {
+ return new WithFailures(successTag, FailureTagList.empty());
+ }
+
+ /**
+ * Variant of {@link FlatMapElements} that that can handle exceptions and
output one or more
+ * failure collections wrapped in a {@link PCollectionTuple}. Specify how to
handle exceptions by
+ * calling {@link #withFailureTag(TupleTag, Class[])}.
+ */
+ public class WithFailures extends PTransform<PCollection<InputT>,
PCollectionTuple> {
+ private final TupleTag<OutputT> successTag;
+ private final FailureTagList<InputT> failureTagList;
+
+ WithFailures(TupleTag<OutputT> successTag, FailureTagList<InputT>
failureTagList) {
+ this.successTag = successTag;
+ this.failureTagList = failureTagList;
+ }
+
+ /**
+ * Returns a modified {@link PTransform} that will catch exceptions of
type {@code ExceptionT}
+ * (as specified by the given {@code TupleTag<Failure<ExceptionT,
InputT>>}).
+ *
+ * <p>If you only want to catch specific subtypes of {@code ExceptionT},
you may pass class
+ * instances for those narrower types as additional parameters.
+ */
+ public <ExceptionT extends Exception> WithFailures withFailureTag(
+ TupleTag<Failure<ExceptionT, InputT>> tag, Class... exceptionsToCatch)
{
+ return new WithFailures(successTag, failureTagList.and(tag,
exceptionsToCatch));
+ }
+
+ @Override
+ public PCollectionTuple expand(PCollection<InputT> input) {
+ checkNotNull(fn, ".via() is required");
+ PCollectionTuple pcs =
+ input.apply(
+ "FlatMapWithFailures",
+ ParDo.of(
+ new DoFn<InputT, OutputT>() {
+ @ProcessElement
+ public void processElement(
+ @Element InputT element, MultiOutputReceiver
receiver, ProcessContext c)
+ throws Exception {
+ Iterable<OutputT> res = null;
+ try {
+ res =
+ fn.getClosure()
+ .apply(c.element(),
Fn.Context.wrapProcessContext(c));
+ } catch (Exception e) {
+ failureTagList.outputOrRethrow(e, element,
receiver);
+ }
+ if (res != null) {
+ for (OutputT output : res) {
+ c.output(output);
+ }
+ }
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.delegate(WithFailures.this);
+ }
+
+ @Override
+ public TypeDescriptor<InputT> getInputTypeDescriptor()
{
+ return inputType;
+ }
+
+ @Override
+ public TypeDescriptor<OutputT>
getOutputTypeDescriptor() {
+ checkOutputType();
+ return outputType;
+ }
+ })
+ .withOutputTags(successTag, failureTagList.tags())
+ .withSideInputs(fn.getRequirements().getSideInputs()));
+ failureTagList.applyFailureCoders(input.getCoder(), pcs);
+ return pcs;
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("class",
originalFnForDisplayData.getClass()));
+ if (originalFnForDisplayData instanceof HasDisplayData) {
+ builder.include("fn", (HasDisplayData) originalFnForDisplayData);
+ }
+ }
+ }
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 41aac41b948..c7b94d52bc2 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -24,9 +24,14 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.transforms.Contextful.Fn;
+import org.apache.beam.sdk.transforms.Contextful.Fn.Context;
+import org.apache.beam.sdk.transforms.Failure.FailureTagList;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
@@ -114,6 +119,16 @@ private MapElements(
fn, fn.getClosure(), TypeDescriptors.inputOf(fn.getClosure()),
outputType);
}
+ private void checkOutputType() {
+ checkState(
+ outputType != null,
+ "%s output type descriptor was null; "
+ + "this probably means that getOutputTypeDescriptor() was called
after "
+ + "serialization/deserialization, but it is only available prior
to "
+ + "serialization, for constructing a pipeline and inferring
coders",
+ MapElements.class.getSimpleName());
+ }
+
@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
checkNotNull(fn, "Must specify a function on MapElements using .via()");
@@ -141,13 +156,7 @@ public void populateDisplayData(DisplayData.Builder
builder) {
@Override
public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- checkState(
- outputType != null,
- "%s output type descriptor was null; "
- + "this probably means that
getOutputTypeDescriptor() was called after "
- + "serialization/deserialization, but it is only
available prior to "
- + "serialization, for constructing a pipeline and
inferring coders",
- MapElements.class.getSimpleName());
+ checkOutputType();
return outputType;
}
})
@@ -162,4 +171,93 @@ public void populateDisplayData(DisplayData.Builder
builder) {
builder.include("fn", (HasDisplayData) originalFnForDisplayData);
}
}
+
+ /**
+ * Sets a {@link TupleTag} to associate with successes, converting this
{@link PTransform} into
+ * one that returns a {@link PCollectionTuple}. This allows you to make
subsequent {@link
+ * WithFailures#withFailureTag(TupleTag, Class[])} calls to capture thrown
exceptions to failure
+ * collections.
+ */
+ public WithFailures withSuccessTag(TupleTag<OutputT> successTag) {
+ return new WithFailures(successTag, FailureTagList.empty());
+ }
+
+ /**
+ * Variant of {@link MapElements} that that can handle exceptions and output
one or more failure
+ * collections wrapped in a {@link PCollectionTuple}. Specify how to handle
exceptions by calling
+ * {@link #withFailureTag(TupleTag, Class[])}.
+ */
+ public class WithFailures extends PTransform<PCollection<InputT>,
PCollectionTuple> {
+ private final TupleTag<OutputT> successTag;
+ private final FailureTagList<InputT> failureTagList;
+
+ WithFailures(TupleTag<OutputT> successTag, FailureTagList<InputT>
failureTagList) {
+ this.successTag = successTag;
+ this.failureTagList = failureTagList;
+ }
+
+ /**
+ * Returns a modified {@link PTransform} that will catch exceptions of
type {@code ExceptionT}
+ * (as specified by the given {@code TupleTag<Failure<ExceptionT,
InputT>>}).
+ *
+ * <p>If you only want to catch specific subtypes of {@code ExceptionT},
you may pass class
+ * instances for those narrower types as additional parameters.
+ */
+ public <ExceptionT extends Exception> WithFailures withFailureTag(
+ TupleTag<Failure<ExceptionT, InputT>> tag, Class... exceptionsToCatch)
{
+ return new WithFailures(successTag, failureTagList.and(tag,
exceptionsToCatch));
+ }
+
+ @Override
+ public PCollectionTuple expand(PCollection<InputT> input) {
+ checkNotNull(fn, "Must specify a function on MapElements using .via()");
+ PCollectionTuple pcs =
+ input.apply(
+ "MapWithFailures",
+ ParDo.of(
+ new DoFn<InputT, OutputT>() {
+ @ProcessElement
+ public void processElement(
+ @Element InputT element, MultiOutputReceiver
receiver, ProcessContext c)
+ throws Exception {
+ try {
+ receiver
+ .get(successTag)
+ .output(
+ fn.getClosure().apply(element,
Context.wrapProcessContext(c)));
+ } catch (Exception e) {
+ failureTagList.outputOrRethrow(e, element,
receiver);
+ }
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.delegate(WithFailures.this);
+ }
+
+ @Override
+ public TypeDescriptor<InputT> getInputTypeDescriptor()
{
+ return inputType;
+ }
+
+ @Override
+ public TypeDescriptor<OutputT>
getOutputTypeDescriptor() {
+ checkOutputType();
+ return outputType;
+ }
+ })
+ .withOutputTags(successTag, failureTagList.tags())
+ .withSideInputs(fn.getRequirements().getSideInputs()));
+ return failureTagList.applyFailureCoders(input.getCoder(), pcs);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("class",
originalFnForDisplayData.getClass()));
+ if (originalFnForDisplayData instanceof HasDisplayData) {
+ builder.include("fn", (HasDisplayData) originalFnForDisplayData);
+ }
+ }
+ }
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FailureTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FailureTest.java
new file mode 100644
index 00000000000..3be7c94efb5
--- /dev/null
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FailureTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.UncheckedIOException;
+import org.apache.beam.sdk.transforms.Failure.FailureTag;
+import org.apache.beam.sdk.values.TupleTag;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+/** Test constraints and internal behavior of {@link Failure}. */
+public class FailureTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFailureTagThrowsForErasedType() {
+ TupleTag<Failure<Exception, String>> tag = new TupleTag<>();
+ FailureTag.of(tag, new Class<?>[] {});
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFailureTagThrowsForConstructor() {
+ TupleTag<Failure<UncheckedIOException, String>> tag =
+ new TupleTag<Failure<UncheckedIOException, String>>() {};
+ FailureTag.of(tag, new Class<?>[] {});
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testThrowsOnDisjointTypes() {
+ TupleTag<Failure<RuntimeException, String>> tag =
+ new TupleTag<Failure<RuntimeException, String>>() {};
+ FailureTag.of(tag, new Class<?>[] {Exception.class});
+ }
+
+ @Test
+ public void testAllowsNarrowingViaSubtypes() {
+ TupleTag<Failure<RuntimeException, String>> tag =
+ new TupleTag<Failure<RuntimeException, String>>() {};
+ FailureTag.of(tag, new Class<?>[] {IllegalArgumentException.class,
NullPointerException.class});
+ }
+
+ @Test
+ public void testSuppliesDefaultClass() {
+ TupleTag<Failure<RuntimeException, String>> tag =
+ new TupleTag<Failure<RuntimeException, String>>() {};
+ FailureTag<RuntimeException, String> failureTag = FailureTag.of(tag, new
Class<?>[] {});
+ assertThat(failureTag.exceptionList(),
Matchers.contains(RuntimeException.class));
+ }
+
+ @Test
+ public void testGetExceptionTypeDescriptor() {
+ TupleTag<Failure<Exception, String>> tag = new TupleTag<Failure<Exception,
String>>() {};
+ assertEquals(Exception.class,
FailureTag.getExceptionTypeDescriptor(tag).getRawType());
+ }
+}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
index a05b7ebea5c..75c8a81b426 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
@@ -19,14 +19,19 @@
import static
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
import java.io.Serializable;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -216,4 +221,80 @@ public boolean isEven(int i) {
return i % 2 == 0;
}
}
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testFilterWithFailures() {
+ TupleTag<Integer> successTag = new TupleTag<Integer>() {};
+ TupleTag<Failure<IntegerException1, Integer>> failureTag1 =
+ new TupleTag<Failure<IntegerException1, Integer>>() {};
+ TupleTag<Failure<IntegerException2, Integer>> failureTag2 =
+ new TupleTag<Failure<IntegerException2, Integer>>() {};
+
+ PCollectionTuple pcs =
+ p.apply(Create.of(1, 2, 3, 4, 5, 6, 7))
+ .apply(
+ Filter.by(new EvenFnWithExceptions())
+ .withSuccessTag(successTag)
+ .withFailureTag(failureTag1, IntegerException1.class)
+ .withFailureTag(failureTag2, IntegerException2.class));
+
+ PAssert.that(pcs.get(successTag)).containsInAnyOrder(4, 6);
+ PAssert.that(pcs.get(failureTag1))
+ .satisfies(
+ failures -> {
+ failures.forEach(
+ (Failure<IntegerException1, Integer> failure) -> {
+ assertEquals(1, failure.value().intValue());
+ });
+ return null;
+ });
+ PAssert.that(pcs.get(failureTag2))
+ .satisfies(
+ failures -> {
+ failures.forEach(
+ (Failure<IntegerException2, Integer> failure) -> {
+ assertEquals(2, failure.value().intValue());
+ });
+ return null;
+ });
+
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testFilterWithFailuresThrowsUncaught() {
+ TupleTag<Integer> successTag = new TupleTag<Integer>() {};
+ TupleTag<Failure<IntegerException1, Integer>> failureTag1 =
+ new TupleTag<Failure<IntegerException1, Integer>>() {};
+
+ PCollectionTuple pcs =
+ p.apply(Create.of(1, 2, 3, 4, 5, 6, 7))
+ .apply(
+ Filter.by(new EvenFnWithExceptions())
+ .withSuccessTag(successTag)
+ .withFailureTag(failureTag1, IntegerException1.class));
+
+ thrown.expect(PipelineExecutionException.class);
+ thrown.expectCause(isA(IntegerException2.class));
+ p.run();
+ }
+
+ static class EvenFnWithExceptions implements SerializableFunction<Integer,
Boolean> {
+ @Override
+ public Boolean apply(Integer elem) {
+ if (elem == 1) {
+ throw new IntegerException1();
+ }
+ if (elem == 2) {
+ throw new IntegerException2();
+ }
+ return elem % 2 == 0;
+ }
+ }
+
+ private static class IntegerException1 extends RuntimeException {}
+
+ private static class IntegerException2 extends RuntimeException {}
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index 061765aa5b2..17a745072e0 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -22,6 +22,8 @@
import static
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.values.TypeDescriptors.integers;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
@@ -30,6 +32,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -37,7 +40,9 @@
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.Rule;
@@ -271,4 +276,95 @@ public void testFlatMapMethodReference() throws Exception {
return ImmutableList.of(input, -input);
}
}
+
+ /** Basic test of {@link FlatMapElements.WithFailures}. */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testFlatMapWithErrors() throws Exception {
+ TupleTag<Integer> successTag = new TupleTag<>();
+ TupleTag<Failure<IntegerException1, Integer>> failureTag1 =
+ new TupleTag<Failure<IntegerException1, Integer>>() {};
+ TupleTag<Failure<IntegerException2, Integer>> failureTag2 =
+ new TupleTag<Failure<IntegerException2, Integer>>() {};
+
+ PCollectionTuple pcs =
+ pipeline
+ .apply(Create.of(1, 2, 3, 4))
+ .apply(
+ FlatMapElements
+ // Note that the input type annotation is required.
+ .into(TypeDescriptors.integers())
+ .via(
+ (Integer i) -> {
+ if (i == 1) {
+ throw new IntegerException1();
+ }
+ if (i == 2) {
+ throw new IntegerException2();
+ }
+ return ImmutableList.of(i, -i);
+ })
+ .withSuccessTag(successTag)
+ .withFailureTag(failureTag1)
+ .withFailureTag(failureTag2));
+
+ PAssert.that(pcs.get(successTag)).containsInAnyOrder(3, -4, -3, 4);
+ PAssert.that(pcs.get(failureTag1))
+ .satisfies(
+ failures -> {
+ failures.forEach(
+ (Failure<IntegerException1, Integer> failure) -> {
+ assertEquals(1, failure.value().intValue());
+ });
+ return null;
+ });
+ PAssert.that(pcs.get(failureTag2))
+ .satisfies(
+ failures -> {
+ failures.forEach(
+ (Failure<IntegerException2, Integer> failure) -> {
+ assertEquals(2, failure.value().intValue());
+ });
+ return null;
+ });
+
+ pipeline.run();
+ }
+
+ /** Basic test of {@link FlatMapElements.WithFailures}. */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testFlatMapWithErrorsThrowsUncaught() throws Exception {
+ TupleTag<Integer> successTag = new TupleTag<>();
+ TupleTag<Failure<IntegerException1, Integer>> failureTag1 =
+ new TupleTag<Failure<IntegerException1, Integer>>() {};
+
+ PCollectionTuple pcs =
+ pipeline
+ .apply(Create.of(1, 2, 3, 4))
+ .apply(
+ FlatMapElements
+ // Note that the input type annotation is required.
+ .into(TypeDescriptors.integers())
+ .via(
+ (Integer i) -> {
+ if (i == 1) {
+ throw new IntegerException1();
+ }
+ if (i == 2) {
+ throw new IntegerException2();
+ }
+ return ImmutableList.of(i, -i);
+ })
+ .withSuccessTag(successTag)
+ .withFailureTag(failureTag1));
+
+ thrown.expect(PipelineExecutionException.class);
+ thrown.expectCause(isA(IntegerException2.class));
+ pipeline.run();
+ }
+
+ private static class IntegerException1 extends RuntimeException {}
+
+ private static class IntegerException2 extends RuntimeException {}
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index c10b06e0fa6..610ee969bcd 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -23,10 +23,13 @@
import static org.apache.beam.sdk.values.TypeDescriptors.integers;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.Is.isA;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.Serializable;
import java.util.Set;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -36,7 +39,9 @@
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.Rule;
@@ -360,4 +365,95 @@ public int doubleIt(int val) {
return val * 2;
}
}
+
+ /** Basic test of {@link MapElements.WithFailures}. * */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testMapWithFailures() throws Exception {
+ TupleTag<Integer> successTag = new TupleTag<>();
+ TupleTag<Failure<IntegerException1, Integer>> failureTag1 =
+ new TupleTag<Failure<IntegerException1, Integer>>() {};
+ TupleTag<Failure<IntegerException2, Integer>> failureTag2 =
+ new TupleTag<Failure<IntegerException2, Integer>>() {};
+
+ PCollectionTuple pcs =
+ pipeline
+ .apply(Create.of(1, 2, 3, 4))
+ .apply(
+ MapElements
+ // Note that the type annotation is required.
+ .into(TypeDescriptors.integers())
+ .via(
+ (Integer i) -> {
+ if (i == 1) {
+ throw new IntegerException1();
+ }
+ if (i == 2) {
+ throw new IntegerException2();
+ }
+ return i * 2;
+ })
+ .withSuccessTag(successTag)
+ .withFailureTag(failureTag1, IntegerException1.class)
+ .withFailureTag(failureTag2, IntegerException2.class));
+
+ PAssert.that(pcs.get(successTag)).containsInAnyOrder(6, 8);
+ PAssert.that(pcs.get(failureTag1))
+ .satisfies(
+ failures -> {
+ failures.forEach(
+ (Failure<IntegerException1, Integer> failure) -> {
+ assertEquals(1, failure.value().intValue());
+ });
+ return null;
+ });
+ PAssert.that(pcs.get(failureTag2))
+ .satisfies(
+ failures -> {
+ failures.forEach(
+ (Failure<IntegerException2, Integer> failure) -> {
+ assertEquals(2, failure.value().intValue());
+ });
+ return null;
+ });
+
+ pipeline.run();
+ }
+
+ /** Basic test of {@link MapElements.WithFailures}. * */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testMapWithFailuresThrowsUncaught() throws Exception {
+ TupleTag<Integer> successTag = new TupleTag<Integer>() {};
+ TupleTag<Failure<IntegerException1, Integer>> failureTag1 =
+ new TupleTag<Failure<IntegerException1, Integer>>() {};
+
+ PCollectionTuple pcs =
+ pipeline
+ .apply(Create.of(1, 2, 3, 4))
+ .apply(
+ MapElements
+ // Note that the type annotation is required.
+ .into(TypeDescriptors.integers())
+ .via(
+ (Integer i) -> {
+ if (i == 1) {
+ throw new IntegerException1();
+ }
+ if (i == 2) {
+ throw new IntegerException2();
+ }
+ return i * 2;
+ })
+ .withSuccessTag(successTag)
+ .withFailureTag(failureTag1, IntegerException1.class));
+
+ thrown.expect(PipelineExecutionException.class);
+ thrown.expectCause(isA(IntegerException2.class));
+ pipeline.run();
+ }
+
+ private static class IntegerException1 extends RuntimeException {}
+
+ private static class IntegerException2 extends RuntimeException {}
}
diff --git
a/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/AsJsons.java
b/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/AsJsons.java
index 9a0ea09f6d5..5c26c4e7959 100644
---
a/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/AsJsons.java
+++
b/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/AsJsons.java
@@ -18,12 +18,19 @@
package org.apache.beam.sdk.extensions.jackson;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Optional;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Failure;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Requirements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptors;
/**
* {@link PTransform} for serializing objects to JSON {@link String Strings}.
Transforms a {@code
@@ -32,6 +39,7 @@
*/
public class AsJsons<InputT> extends PTransform<PCollection<InputT>,
PCollection<String>> {
private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper();
+ private static final TupleTag<String> SUCCESS_TAG = new TupleTag<String>()
{};
private final Class<? extends InputT> inputClass;
private ObjectMapper customMapper;
@@ -41,12 +49,12 @@
* into a {@link PCollection} of JSON {@link String Strings} representing
those objects using a
* Jackson {@link ObjectMapper}.
*/
- public static <OutputT> AsJsons<OutputT> of(Class<? extends OutputT>
outputClass) {
- return new AsJsons<>(outputClass);
+ public static <InputT> AsJsons<InputT> of(Class<? extends InputT>
inputClass) {
+ return new AsJsons<>(inputClass);
}
- private AsJsons(Class<? extends InputT> outputClass) {
- this.inputClass = outputClass;
+ private AsJsons(Class<? extends InputT> inputClass) {
+ this.inputClass = inputClass;
}
/** Use custom Jackson {@link ObjectMapper} instead of the default one. */
@@ -56,6 +64,11 @@ private AsJsons(Class<? extends InputT> outputClass) {
return newTransform;
}
+ private String writeValue(InputT input) throws IOException {
+ ObjectMapper mapper =
Optional.ofNullable(customMapper).orElse(DEFAULT_MAPPER);
+ return mapper.writeValueAsString(input);
+ }
+
@Override
public PCollection<String> expand(PCollection<InputT> input) {
return input.apply(
@@ -64,13 +77,76 @@ private AsJsons(Class<? extends InputT> outputClass) {
@Override
public String apply(InputT input) {
try {
- ObjectMapper mapper =
Optional.fromNullable(customMapper).or(DEFAULT_MAPPER);
- return mapper.writeValueAsString(input);
+ return writeValue(input);
} catch (IOException e) {
- throw new RuntimeException(
+ throw new UncheckedIOException(
"Failed to serialize " + inputClass.getName() + " value:
" + input, e);
}
}
}));
}
+
+ /** Return the {@link TupleTag} associated with successfully written JSON
strings. */
+ public static TupleTag<String> successTag() {
+ return SUCCESS_TAG;
+ }
+
+ /**
+ * Sets a {@link TupleTag} to associate with serialization failures,
converting this {@link
+ * PTransform} into one that returns a {@link PCollectionTuple}.
+ *
+ * <p>Because the success output type is static ({@code String}), users do
not need to supply a
+ * success tag. Instead, successes are always associated with the tag
returned by static method
+ * {@link AsJsons#successTag()}.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * PCollection<Foo> foos = ...;
+ * TupleTag<Failure<IOException, Foo>> unserializedFoosTag =
+ * new TupleTag<Failure<IOException, Foo>>(){};
+ *
+ * PCollection<String> strings = foos
+ * .apply(AsJsons
+ * .of(Foo.class)
+ * .withFailureTag(unserializedFoosTag))
+ * .apply(PTransform.compose((PCollectionTuple pcs) -> {
+ * pcs.get(unserializedFoosTag).apply(new MyErrorOutputTransform());
+ * return pcs.get(AsJsons.successTag());
+ * }));
+ * }</pre>
+ */
+ public WithFailures withFailureTag(TupleTag<Failure<IOException, InputT>>
failureTag) {
+ return new WithFailures(failureTag);
+ }
+
+ /**
+ * Variant of {@link AsJsons} that that catches {@link IOException} raised
while writing JSON,
+ * wrapping success and failure collections in a {@link PCollectionTuple}.
+ */
+ public class WithFailures extends PTransform<PCollection<InputT>,
PCollectionTuple> {
+ private final TupleTag<Failure<IOException, InputT>> failureTag;
+
+ WithFailures(TupleTag<Failure<IOException, InputT>> failureTag) {
+ this.failureTag = failureTag;
+ }
+
+ @Override
+ public PCollectionTuple expand(PCollection<InputT> input) {
+ return input.apply(
+ MapElements.into(TypeDescriptors.strings())
+ .via(
+ Contextful.fn(
+ new Contextful.Fn<InputT, String>() {
+ @Override
+ public String apply(InputT input,
Contextful.Fn.Context c)
+ throws IOException {
+ return writeValue(input);
+ }
+ },
+ Requirements.empty()))
+ .withSuccessTag(SUCCESS_TAG)
+ .withFailureTag(failureTag));
+ }
+ }
}
diff --git
a/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/ParseJsons.java
b/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/ParseJsons.java
index 4844836f893..e39600cf1d6 100644
---
a/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/ParseJsons.java
+++
b/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/ParseJsons.java
@@ -18,12 +18,19 @@
package org.apache.beam.sdk.extensions.jackson;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Optional;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Failure;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Requirements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
/**
* {@link PTransform} for parsing JSON {@link String Strings}. Parse {@link
PCollection} of {@link
@@ -32,6 +39,8 @@
*/
public class ParseJsons<OutputT> extends PTransform<PCollection<String>,
PCollection<OutputT>> {
private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper();
+ private static final TupleTag<Failure<IOException, String>> FAILURE_TAG =
+ new TupleTag<Failure<IOException, String>>() {};
private final Class<? extends OutputT> outputClass;
private ObjectMapper customMapper;
@@ -55,6 +64,11 @@ private ParseJsons(Class<? extends OutputT> outputClass) {
return newTransform;
}
+ private OutputT readValue(String input) throws IOException {
+ ObjectMapper mapper =
Optional.ofNullable(customMapper).orElse(DEFAULT_MAPPER);
+ return mapper.readValue(input, outputClass);
+ }
+
@Override
public PCollection<OutputT> expand(PCollection<String> input) {
return input.apply(
@@ -63,14 +77,77 @@ private ParseJsons(Class<? extends OutputT> outputClass) {
@Override
public OutputT apply(String input) {
try {
- ObjectMapper mapper =
Optional.fromNullable(customMapper).or(DEFAULT_MAPPER);
- return mapper.readValue(input, outputClass);
+ return readValue(input);
} catch (IOException e) {
- throw new RuntimeException(
+ throw new UncheckedIOException(
"Failed to parse a " + outputClass.getName() + " from
JSON value: " + input,
e);
}
}
}));
}
+
+ /** Return the {@link TupleTag} associated with parsing failures. */
+ public static TupleTag<Failure<IOException, String>> failureTag() {
+ return FAILURE_TAG;
+ }
+
+ /**
+ * Sets a {@link TupleTag} to associate with successes, converting this
{@link PTransform} into
+ * one that returns a {@link PCollectionTuple} and catches any {@link
IOException} raised while
+ * parsing JSON.
+ *
+ * <p>Because the input type is static ({@code String}), the failure type is
also static and users
+ * do not need to supply a failure tag. Instead, failures are always
associated with the tag
+ * returned by static method {@link ParseJsons#failureTag()}.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * PCollection<String> strings = ...;
+ * TupleTag<Foo> parsedFoosTag = new TupleTag<>();
+ *
+ * PCollection<Foo> foos = strings
+ * .apply(ParseJsons
+ * .of(Foo.class)
+ * .withSuccessTag(parsedFoosTag))
+ * .apply(PTransform.compose((PCollectionTuple pcs) -> {
+ * pcs.get(ParseJsons.failureTag()).apply(new MyErrorOutputTransform());
+ * return pcs.get(parsedFoosTag);
+ * }));
+ * }</pre>
+ */
+ public WithFailures withSuccessTag(TupleTag<OutputT> successTag) {
+ return new WithFailures(successTag);
+ }
+
+ /**
+ * Variant of {@link ParseJsons} that that catches {@link IOException}
raised while parsing JSON,
+ * wrapping success and failure collections in a {@link PCollectionTuple}.
+ */
+ public class WithFailures extends PTransform<PCollection<String>,
PCollectionTuple> {
+ private final TupleTag<OutputT> successTag;
+
+ public WithFailures(TupleTag<OutputT> successTag) {
+ this.successTag = successTag;
+ }
+
+ @Override
+ public PCollectionTuple expand(PCollection<String> input) {
+ return input.apply(
+ MapElements.into(new TypeDescriptor<OutputT>() {})
+ .via(
+ Contextful.fn(
+ new Contextful.Fn<String, OutputT>() {
+ @Override
+ public OutputT apply(String input,
Contextful.Fn.Context c)
+ throws IOException {
+ return readValue(input);
+ }
+ },
+ Requirements.empty()))
+ .withSuccessTag(successTag)
+ .withFailureTag(FAILURE_TAG));
+ }
+ }
}
diff --git
a/sdks/java/extensions/jackson/src/test/java/org/apache/beam/sdk/extensions/jackson/JacksonTransformsTest.java
b/sdks/java/extensions/jackson/src/test/java/org/apache/beam/sdk/extensions/jackson/JacksonTransformsTest.java
index d78f12cb5bd..742aa2dbcfe 100644
---
a/sdks/java/extensions/jackson/src/test/java/org/apache/beam/sdk/extensions/jackson/JacksonTransformsTest.java
+++
b/sdks/java/extensions/jackson/src/test/java/org/apache/beam/sdk/extensions/jackson/JacksonTransformsTest.java
@@ -17,10 +17,13 @@
*/
package org.apache.beam.sdk.extensions.jackson;
+import static org.junit.Assert.assertTrue;
+
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.collect.Iterables;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
@@ -30,12 +33,18 @@
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Failure;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.Rule;
import org.junit.Test;
/** Test Jackson transforms {@link ParseJsons} and {@link AsJsons}. */
-public class JacksonTransformsTest {
+public class JacksonTransformsTest implements Serializable {
private static final List<String> VALID_JSONS =
Arrays.asList("{\"myString\":\"abc\",\"myInt\":3}",
"{\"myString\":\"def\",\"myInt\":4}");
@@ -82,6 +91,36 @@ public void failParsingInvalidJsons() {
pipeline.run();
}
+ @Test
+ public void collectInvalidJsons() {
+ TupleTag<MyPojo> successTag = new TupleTag<>();
+
+ PCollectionTuple pcs =
+ pipeline
+ .apply(Create.of(Iterables.concat(VALID_JSONS, INVALID_JSONS)))
+ .apply(ParseJsons.of(MyPojo.class).withSuccessTag(successTag));
+
+ pcs.get(successTag).setCoder(SerializableCoder.of(MyPojo.class));
+
+ PAssert.that(pcs.get(successTag)).containsInAnyOrder(POJOS);
+ PAssert.that(pcs.get(ParseJsons.failureTag()))
+ .satisfies(
+ failures -> {
+ failures.forEach(
+ (Failure<IOException, String> failure) -> {
+ assertTrue(failure.exception() instanceof IOException);
+ });
+ return null;
+ });
+
+ PCollection<String> failureValues =
+ pcs.get(ParseJsons.failureTag())
+
.apply(MapElements.into(TypeDescriptors.strings()).via(Failure::value));
+ PAssert.that(failureValues).containsInAnyOrder(INVALID_JSONS);
+
+ pipeline.run();
+ }
+
@Test(expected = Pipeline.PipelineExecutionException.class)
public void failParsingWithoutCustomMapper() {
PCollection<MyPojo> output =
@@ -134,6 +173,27 @@ public void failWritingWithoutCustomMapper() {
pipeline.run();
}
+ @Test
+ public void collectWriteFailuresWithoutCustomMapper() {
+ TupleTag<Failure<IOException, MyEmptyBean>> failureTag =
+ new TupleTag<Failure<IOException, MyEmptyBean>>() {};
+
+ final PCollectionTuple pcs =
+ pipeline
+ .apply(Create.of(EMPTY_BEANS))
+ .apply(AsJsons.of(MyEmptyBean.class).withFailureTag(failureTag));
+
+ pcs.get(AsJsons.successTag()).setCoder(StringUtf8Coder.of());
+ PAssert.that(pcs.get(AsJsons.successTag())).empty();
+
+ PCollection<MyEmptyBean> failureValues =
+ pcs.get(failureTag)
+
.apply(MapElements.into(TypeDescriptor.of(MyEmptyBean.class)).via(Failure::value));
+ PAssert.that(failureValues).containsInAnyOrder(EMPTY_BEANS);
+
+ pipeline.run();
+ }
+
@Test
public void writeUsingCustomMapper() {
ObjectMapper customMapper =
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 160774)
Time Spent: 4.5h (was: 4h 20m)
Remaining Estimate: 163.5h (was: 163h 40m)
> Add exception handling to single message transforms in Java SDK
> ---------------------------------------------------------------
>
> Key: BEAM-5638
> URL: https://issues.apache.org/jira/browse/BEAM-5638
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Jeff Klukas
> Assignee: Jeff Klukas
> Priority: Minor
> Original Estimate: 168h
> Time Spent: 4.5h
> Remaining Estimate: 163.5h
>
> Add methods to MapElements, FlatMapElements, and Filter that allow users to
> specify expected exceptions and tuple tags to associate with the with
> collections of the successfully and unsuccessfully processed elements.
> See discussion on dev list:
> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)