Unwrap UserCodeException in DoFnTester The execution of user code and system code is intertwined in the OldDoFn wrapper of DoFn. So DoFnTester will sometimes encounter a wrapped UserCodeException where previously the exception would not have been wrapped.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1d601af Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1d601af Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1d601af Branch: refs/heads/gearpump-runner Commit: a1d601afd0b98bf6183b14a8bbd5e6b8bee0233c Parents: 2c8eb42 Author: Kenneth Knowles <[email protected]> Authored: Tue Aug 9 12:39:41 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Aug 9 12:41:52 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/DoFnTester.java | 29 ++++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1d601af/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index e2764eb..a2ce6c9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.InMemoryStateInternals; @@ -190,10 +191,24 @@ public class DoFnTester<InputT, OutputT> { initializeState(); TestContext<InputT, OutputT> context = createContext(fn); context.setupDelegateAggregators(); - fn.startBundle(context); + try { + fn.startBundle(context); + } catch (UserCodeException e) { + unwrapUserCodeException(e); + } state = State.STARTED; } + private static void unwrapUserCodeException(UserCodeException e) throws Exception { + if (e.getCause() instanceof Exception) { + throw (Exception) e.getCause(); + } else if (e.getCause() instanceof Error) { + throw (Error) e.getCause(); + } else { + throw e; + } + } + /** * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a * context where {@link OldDoFn.ProcessContext#element} returns the @@ -212,7 +227,11 @@ public class DoFnTester<InputT, OutputT> { if (state == State.UNSTARTED) { startBundle(); } - fn.processElement(createProcessContext(fn, element)); + try { + fn.processElement(createProcessContext(fn, element)); + } catch (UserCodeException e) { + unwrapUserCodeException(e); + } } /** @@ -231,7 +250,11 @@ public class DoFnTester<InputT, OutputT> { if (state == State.UNSTARTED) { startBundle(); } - fn.finishBundle(createContext(fn)); + try { + fn.finishBundle(createContext(fn)); + } catch (UserCodeException e) { + unwrapUserCodeException(e); + } state = State.FINISHED; }
