Test that SimpleDoFnRunner wraps exceptions in startBundle and finishBundle


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3b4c7d10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3b4c7d10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3b4c7d10

Branch: refs/heads/gearpump-runner
Commit: 3b4c7d103c07e73d30b2ad534a17b3059232dbda
Parents: 8af13b0
Author: Kenneth Knowles <k...@google.com>
Authored: Fri Dec 16 13:43:54 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Fri Dec 16 20:14:19 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunnerTest.java | 53 ++++++++++++++++++++
 1 file changed, 53 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3b4c7d10/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index f068c19..837a162 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -151,6 +151,49 @@ public class SimpleDoFnRunnerTest {
             TimeDomain.EVENT_TIME);
   }
 
+  @Test
+  public void testStartBundleExceptionsWrappedAsUserCodeException() {
+    ThrowingDoFn fn = new ThrowingDoFn();
+    DoFnRunner<String, String> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            null,
+            null,
+            null,
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(new GlobalWindows()));
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(is(fn.exceptionToThrow));
+
+    runner.startBundle();
+  }
+
+  @Test
+  public void testFinishBundleExceptionsWrappedAsUserCodeException() {
+    ThrowingDoFn fn = new ThrowingDoFn();
+    DoFnRunner<String, String> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            null,
+            null,
+            null,
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(new GlobalWindows()));
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(is(fn.exceptionToThrow));
+
+    runner.finishBundle();
+  }
+
+
   /**
    * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the 
underlying
    * {@link DoFn}.
@@ -200,6 +243,16 @@ public class SimpleDoFnRunnerTest {
     @TimerId(TIMER_ID)
     private static final TimerSpec timer = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
+    @StartBundle
+    public void startBundle(Context c) throws Exception {
+      throw exceptionToThrow;
+    }
+
+    @FinishBundle
+    public void finishBundle(Context c) throws Exception {
+      throw exceptionToThrow;
+    }
+
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       throw exceptionToThrow;

Reply via email to