Move ParDo Lifecycle tests to their own file

These tests are not yet functional in all runners, and this makes them
easier to ignore.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/29cbdceb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/29cbdceb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/29cbdceb

Branch: refs/heads/master
Commit: 29cbdceb5b78ce86ad0d90050d7542b0d5b45362
Parents: 12abb1b
Author: Thomas Groh <tg...@google.com>
Authored: Thu Aug 11 10:45:43 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Mon Aug 15 14:16:54 2016 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |  10 +
 .../beam/sdk/transforms/ParDoLifecycleTest.java | 448 +++++++++++++++++++
 .../apache/beam/sdk/transforms/ParDoTest.java   | 405 -----------------
 3 files changed, 458 insertions(+), 405 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29cbdceb/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml 
b/runners/google-cloud-dataflow-java/pom.xml
index 86991b7..c32e184 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -60,6 +60,16 @@
             <beamUseDummyRunner>true</beamUseDummyRunner>
           </systemPropertyVariables>
         </configuration>
+        <executions>
+          <execution>
+            <id>runnable-on-service-tests</id>
+            <configuration>
+              <excludes>
+                
<exclude>org/apache/beam/sdk/transforms/ParDoLifecycleTest.java</exclude>
+              </excludes>
+            </configuration>
+          </execution>
+        </executions>
       </plugin>
 
       <!-- Run CheckStyle pass on transforms, as they are release in

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29cbdceb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
new file mode 100644
index 0000000..272fea7
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Tests that {@link ParDo} exercises {@link DoFn} methods in the appropriate 
sequence.
+ */
+@RunWith(JUnit4.class)
+public class ParDoLifecycleTest {
+  @Test
+  @Category(RunnableOnService.class)
+  public void testOldFnCallSequence() {
+    TestPipeline p = TestPipeline.create();
+    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
+        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
+        .apply(Flatten.<Integer>pCollections())
+        .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>()));
+
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testOldFnCallSequenceMulti() {
+    TestPipeline p = TestPipeline.create();
+    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
+        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
+        .apply(Flatten.<Integer>pCollections())
+        .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>())
+            .withOutputTags(new TupleTag<Integer>() {}, TupleTagList.empty()));
+
+    p.run();
+  }
+
+  private static class CallSequenceEnforcingOldFn<T> extends OldDoFn<T, T> {
+    private boolean setupCalled = false;
+    private int startBundleCalls = 0;
+    private int finishBundleCalls = 0;
+    private boolean teardownCalled = false;
+
+    @Override
+    public void setup() {
+      assertThat("setup should not be called twice", setupCalled, is(false));
+      assertThat("setup should be called before startBundle", 
startBundleCalls, equalTo(0));
+      assertThat("setup should be called before finishBundle", 
finishBundleCalls, equalTo(0));
+      assertThat("setup should be called before teardown", teardownCalled, 
is(false));
+      setupCalled = true;
+    }
+
+    @Override
+    public void startBundle(Context c) {
+      assertThat("setup should have been called", setupCalled, is(true));
+      assertThat(
+          "Even number of startBundle and finishBundle calls in startBundle",
+          startBundleCalls,
+          equalTo(finishBundleCalls));
+      assertThat("teardown should not have been called", teardownCalled, 
is(false));
+      startBundleCalls++;
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      assertThat("startBundle should have been called", startBundleCalls, 
greaterThan(0));
+      assertThat(
+          "there should be one startBundle call with no call to finishBundle",
+          startBundleCalls,
+          equalTo(finishBundleCalls + 1));
+      assertThat("teardown should not have been called", teardownCalled, 
is(false));
+    }
+
+    @Override
+    public void finishBundle(Context c) {
+      assertThat("startBundle should have been called", startBundleCalls, 
greaterThan(0));
+      assertThat(
+          "there should be one bundle that has been started but not finished",
+          startBundleCalls,
+          equalTo(finishBundleCalls + 1));
+      assertThat("teardown should not have been called", teardownCalled, 
is(false));
+      finishBundleCalls++;
+    }
+
+    @Override
+    public void teardown() {
+      assertThat(setupCalled, is(true));
+      assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls)));
+      assertThat(teardownCalled, is(false));
+      teardownCalled = true;
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testFnCallSequence() {
+    TestPipeline p = TestPipeline.create();
+    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
+        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
+        .apply(Flatten.<Integer>pCollections())
+        .apply(ParDo.of(new CallSequenceEnforcingFn<Integer>()));
+
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testFnCallSequenceMulti() {
+    TestPipeline p = TestPipeline.create();
+    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
+        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
+        .apply(Flatten.<Integer>pCollections())
+        .apply(ParDo.of(new CallSequenceEnforcingFn<Integer>())
+            .withOutputTags(new TupleTag<Integer>() {
+            }, TupleTagList.empty()));
+
+    p.run();
+  }
+
+  private static class CallSequenceEnforcingFn<T> extends DoFn<T, T> {
+    private boolean setupCalled = false;
+    private int startBundleCalls = 0;
+    private int finishBundleCalls = 0;
+    private boolean teardownCalled = false;
+
+    @Setup
+    public void before() {
+      assertThat("setup should not be called twice", setupCalled, is(false));
+      assertThat("setup should be called before startBundle", 
startBundleCalls, equalTo(0));
+      assertThat("setup should be called before finishBundle", 
finishBundleCalls, equalTo(0));
+      assertThat("setup should be called before teardown", teardownCalled, 
is(false));
+      setupCalled = true;
+    }
+
+    @StartBundle
+    public void begin(Context c) {
+      assertThat("setup should have been called", setupCalled, is(true));
+      assertThat("Even number of startBundle and finishBundle calls in 
startBundle",
+          startBundleCalls,
+          equalTo(finishBundleCalls));
+      assertThat("teardown should not have been called", teardownCalled, 
is(false));
+      startBundleCalls++;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) throws Exception {
+      assertThat("startBundle should have been called", startBundleCalls, 
greaterThan(0));
+      assertThat("there should be one startBundle call with no call to 
finishBundle",
+          startBundleCalls,
+          equalTo(finishBundleCalls + 1));
+      assertThat("teardown should not have been called", teardownCalled, 
is(false));
+    }
+
+    @FinishBundle
+    public void end(Context c) {
+      assertThat("startBundle should have been called", startBundleCalls, 
greaterThan(0));
+      assertThat("there should be one bundle that has been started but not 
finished",
+          startBundleCalls,
+          equalTo(finishBundleCalls + 1));
+      assertThat("teardown should not have been called", teardownCalled, 
is(false));
+      finishBundleCalls++;
+    }
+
+    @Teardown
+    public void after() {
+      assertThat(setupCalled, is(true));
+      assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls)));
+      assertThat(teardownCalled, is(false));
+      teardownCalled = true;
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testTeardownCalledAfterExceptionInSetup() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.SETUP);
+    p
+        .apply(Create.of(1, 2, 3))
+        .apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat(
+          "Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testTeardownCalledAfterExceptionInStartBundle() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
+    p
+        .apply(Create.of(1, 2, 3))
+        .apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat(
+          "Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testTeardownCalledAfterExceptionInProcessElement() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
+    p
+        .apply(Create.of(1, 2, 3))
+        .apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat(
+          "Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testTeardownCalledAfterExceptionInFinishBundle() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
+    p
+        .apply(Create.of(1, 2, 3))
+        .apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat(
+          "Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWithContextTeardownCalledAfterExceptionInSetup() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.SETUP);
+    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat("Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWithContextTeardownCalledAfterExceptionInStartBundle() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
+    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat("Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWithContextTeardownCalledAfterExceptionInProcessElement() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
+    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat("Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWithContextTeardownCalledAfterExceptionInFinishBundle() {
+    TestPipeline p = TestPipeline.create();
+    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
+    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      assertThat("Function should have been torn down after exception",
+          ExceptionThrowingOldFn.teardownCalled.get(),
+          is(true));
+    }
+  }
+
+  private static class ExceptionThrowingOldFn extends OldDoFn<Object, Object> {
+    static AtomicBoolean teardownCalled = new AtomicBoolean(false);
+
+    private final MethodForException toThrow;
+    private boolean thrown;
+
+    private ExceptionThrowingOldFn(MethodForException toThrow) {
+      this.toThrow = toThrow;
+    }
+
+    @Override
+    public void setup() throws Exception {
+      throwIfNecessary(MethodForException.SETUP);
+    }
+
+    @Override
+    public void startBundle(Context c) throws Exception {
+      throwIfNecessary(MethodForException.START_BUNDLE);
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      throwIfNecessary(MethodForException.PROCESS_ELEMENT);
+    }
+
+    @Override
+    public void finishBundle(Context c) throws Exception {
+      throwIfNecessary(MethodForException.FINISH_BUNDLE);
+    }
+
+    private void throwIfNecessary(MethodForException method) throws Exception {
+      if (toThrow == method && !thrown) {
+        thrown = true;
+        throw new Exception("Hasn't yet thrown");
+      }
+    }
+
+    @Override
+    public void teardown() {
+      if (!thrown) {
+        fail("Excepted to have a processing method throw an exception");
+      }
+      teardownCalled.set(true);
+    }
+  }
+
+
+  private static class ExceptionThrowingFn extends DoFn<Object, Object> {
+    static AtomicBoolean teardownCalled = new AtomicBoolean(false);
+
+    private final MethodForException toThrow;
+    private boolean thrown;
+
+    private ExceptionThrowingFn(MethodForException toThrow) {
+      this.toThrow = toThrow;
+    }
+
+    @Setup
+    public void before() throws Exception {
+      throwIfNecessary(MethodForException.SETUP);
+    }
+
+    @StartBundle
+    public void preBundle(Context c) throws Exception {
+      throwIfNecessary(MethodForException.START_BUNDLE);
+    }
+
+    @ProcessElement
+    public void perElement(ProcessContext c) throws Exception {
+      throwIfNecessary(MethodForException.PROCESS_ELEMENT);
+    }
+
+    @FinishBundle
+    public void postBundle(Context c) throws Exception {
+      throwIfNecessary(MethodForException.FINISH_BUNDLE);
+    }
+
+    private void throwIfNecessary(MethodForException method) throws Exception {
+      if (toThrow == method && !thrown) {
+        thrown = true;
+        throw new Exception("Hasn't yet thrown");
+      }
+    }
+
+    @Teardown
+    public void after() {
+      if (!thrown) {
+        fail("Excepted to have a processing method throw an exception");
+      }
+      teardownCalled.set(true);
+    }
+  }
+
+  private enum MethodForException {
+    SETUP,
+    START_BUNDLE,
+    PROCESS_ELEMENT,
+    FINISH_BUNDLE
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29cbdceb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 8460124..c384114 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -28,14 +28,11 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.is;
 import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
@@ -54,7 +51,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -80,7 +76,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Tests for ParDo.
@@ -1475,404 +1470,4 @@ public class ParDoTest implements Serializable {
     assertThat(displayData, includesDisplayDataFrom(fn));
     assertThat(displayData, hasDisplayItem("fn", fn.getClass()));
   }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testFnCallSequence() {
-    TestPipeline p = TestPipeline.create();
-    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
-        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
-        .apply(Flatten.<Integer>pCollections())
-        .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>()));
-
-    p.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testFnCallSequenceMulti() {
-    TestPipeline p = TestPipeline.create();
-    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
-        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
-        .apply(Flatten.<Integer>pCollections())
-        .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>())
-                .withOutputTags(new TupleTag<Integer>() {}, 
TupleTagList.empty()));
-
-    p.run();
-  }
-
-  private static class CallSequenceEnforcingOldFn<T> extends OldDoFn<T, T> {
-    private boolean setupCalled = false;
-    private int startBundleCalls = 0;
-    private int finishBundleCalls = 0;
-    private boolean teardownCalled = false;
-
-    @Override
-    public void setup() {
-      assertThat("setup should not be called twice", setupCalled, is(false));
-      assertThat("setup should be called before startBundle", 
startBundleCalls, equalTo(0));
-      assertThat("setup should be called before finishBundle", 
finishBundleCalls, equalTo(0));
-      assertThat("setup should be called before teardown", teardownCalled, 
is(false));
-      setupCalled = true;
-    }
-
-    @Override
-    public void startBundle(Context c) {
-      assertThat("setup should have been called", setupCalled, is(true));
-      assertThat(
-          "Even number of startBundle and finishBundle calls in startBundle",
-          startBundleCalls,
-          equalTo(finishBundleCalls));
-      assertThat("teardown should not have been called", teardownCalled, 
is(false));
-      startBundleCalls++;
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      assertThat("startBundle should have been called", startBundleCalls, 
greaterThan(0));
-      assertThat(
-          "there should be one startBundle call with no call to finishBundle",
-          startBundleCalls,
-          equalTo(finishBundleCalls + 1));
-      assertThat("teardown should not have been called", teardownCalled, 
is(false));
-    }
-
-    @Override
-    public void finishBundle(Context c) {
-      assertThat("startBundle should have been called", startBundleCalls, 
greaterThan(0));
-      assertThat(
-          "there should be one bundle that has been started but not finished",
-          startBundleCalls,
-          equalTo(finishBundleCalls + 1));
-      assertThat("teardown should not have been called", teardownCalled, 
is(false));
-      finishBundleCalls++;
-    }
-
-    @Override
-    public void teardown() {
-      assertThat(setupCalled, is(true));
-      assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls)));
-      assertThat(teardownCalled, is(false));
-      teardownCalled = true;
-    }
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testFnWithContextCallSequence() {
-    TestPipeline p = TestPipeline.create();
-    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
-        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
-        .apply(Flatten.<Integer>pCollections())
-        .apply(ParDo.of(new CallSequenceEnforcingFn<Integer>()));
-
-    p.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testFnWithContextCallSequenceMulti() {
-    TestPipeline p = TestPipeline.create();
-    PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
-        .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
-        .apply(Flatten.<Integer>pCollections())
-        .apply(ParDo.of(new CallSequenceEnforcingFn<Integer>())
-            .withOutputTags(new TupleTag<Integer>() {
-            }, TupleTagList.empty()));
-
-    p.run();
-  }
-
-  private static class CallSequenceEnforcingFn<T> extends DoFn<T, T> {
-    private boolean setupCalled = false;
-    private int startBundleCalls = 0;
-    private int finishBundleCalls = 0;
-    private boolean teardownCalled = false;
-
-    @Setup
-    public void before() {
-      assertThat("setup should not be called twice", setupCalled, is(false));
-      assertThat("setup should be called before startBundle", 
startBundleCalls, equalTo(0));
-      assertThat("setup should be called before finishBundle", 
finishBundleCalls, equalTo(0));
-      assertThat("setup should be called before teardown", teardownCalled, 
is(false));
-      setupCalled = true;
-    }
-
-    @StartBundle
-    public void begin(Context c) {
-      assertThat("setup should have been called", setupCalled, is(true));
-      assertThat("Even number of startBundle and finishBundle calls in 
startBundle",
-          startBundleCalls,
-          equalTo(finishBundleCalls));
-      assertThat("teardown should not have been called", teardownCalled, 
is(false));
-      startBundleCalls++;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) throws Exception {
-      assertThat("startBundle should have been called", startBundleCalls, 
greaterThan(0));
-      assertThat("there should be one startBundle call with no call to 
finishBundle",
-          startBundleCalls,
-          equalTo(finishBundleCalls + 1));
-      assertThat("teardown should not have been called", teardownCalled, 
is(false));
-    }
-
-    @FinishBundle
-    public void end(Context c) {
-      assertThat("startBundle should have been called", startBundleCalls, 
greaterThan(0));
-      assertThat("there should be one bundle that has been started but not 
finished",
-          startBundleCalls,
-          equalTo(finishBundleCalls + 1));
-      assertThat("teardown should not have been called", teardownCalled, 
is(false));
-      finishBundleCalls++;
-    }
-
-    @Teardown
-    public void after() {
-      assertThat(setupCalled, is(true));
-      assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls)));
-      assertThat(teardownCalled, is(false));
-      teardownCalled = true;
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testTeardownCalledAfterExceptionInSetup() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.SETUP);
-    p
-        .apply(Create.of(1, 2, 3))
-        .apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testTeardownCalledAfterExceptionInStartBundle() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
-    p
-        .apply(Create.of(1, 2, 3))
-        .apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testTeardownCalledAfterExceptionInProcessElement() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
-    p
-        .apply(Create.of(1, 2, 3))
-        .apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testTeardownCalledAfterExceptionInFinishBundle() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
-    p
-        .apply(Create.of(1, 2, 3))
-        .apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWithContextTeardownCalledAfterExceptionInSetup() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.SETUP);
-    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat("Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWithContextTeardownCalledAfterExceptionInStartBundle() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
-    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat("Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWithContextTeardownCalledAfterExceptionInProcessElement() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
-    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat("Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWithContextTeardownCalledAfterExceptionInFinishBundle() {
-    TestPipeline p = TestPipeline.create();
-    ExceptionThrowingOldFn fn = new 
ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
-    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      assertThat("Function should have been torn down after exception",
-          ExceptionThrowingOldFn.teardownCalled.get(),
-          is(true));
-    }
-  }
-
-  private static class ExceptionThrowingOldFn extends OldDoFn<Object, Object> {
-    static AtomicBoolean teardownCalled = new AtomicBoolean(false);
-
-    private final MethodForException toThrow;
-    private boolean thrown;
-
-    private ExceptionThrowingOldFn(MethodForException toThrow) {
-      this.toThrow = toThrow;
-    }
-
-    @Override
-    public void setup() throws Exception {
-      throwIfNecessary(MethodForException.SETUP);
-    }
-
-    @Override
-    public void startBundle(Context c) throws Exception {
-      throwIfNecessary(MethodForException.START_BUNDLE);
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      throwIfNecessary(MethodForException.PROCESS_ELEMENT);
-    }
-
-    @Override
-    public void finishBundle(Context c) throws Exception {
-      throwIfNecessary(MethodForException.FINISH_BUNDLE);
-    }
-
-    private void throwIfNecessary(MethodForException method) throws Exception {
-      if (toThrow == method && !thrown) {
-        thrown = true;
-        throw new Exception("Hasn't yet thrown");
-      }
-    }
-
-    @Override
-    public void teardown() {
-      if (!thrown) {
-        fail("Excepted to have a processing method throw an exception");
-      }
-      teardownCalled.set(true);
-    }
-  }
-
-
-  private static class ExceptionThrowingFn extends DoFn<Object, Object> {
-    static AtomicBoolean teardownCalled = new AtomicBoolean(false);
-
-    private final MethodForException toThrow;
-    private boolean thrown;
-
-    private ExceptionThrowingFn(MethodForException toThrow) {
-      this.toThrow = toThrow;
-    }
-
-    @Setup
-    public void before() throws Exception {
-      throwIfNecessary(MethodForException.SETUP);
-    }
-
-    @StartBundle
-    public void preBundle(Context c) throws Exception {
-      throwIfNecessary(MethodForException.START_BUNDLE);
-    }
-
-    @ProcessElement
-    public void perElement(ProcessContext c) throws Exception {
-      throwIfNecessary(MethodForException.PROCESS_ELEMENT);
-    }
-
-    @FinishBundle
-    public void postBundle(Context c) throws Exception {
-      throwIfNecessary(MethodForException.FINISH_BUNDLE);
-    }
-
-    private void throwIfNecessary(MethodForException method) throws Exception {
-      if (toThrow == method && !thrown) {
-        thrown = true;
-        throw new Exception("Hasn't yet thrown");
-      }
-    }
-
-    @Teardown
-    public void after() {
-      if (!thrown) {
-        fail("Excepted to have a processing method throw an exception");
-      }
-      teardownCalled.set(true);
-    }
-  }
-
-  private enum MethodForException {
-    SETUP,
-    START_BUNDLE,
-    PROCESS_ELEMENT,
-    FINISH_BUNDLE
-  }
 }

Reply via email to