Move ParDo Lifecycle tests to their own file These tests are not yet functional in all runners, and this makes them easier to ignore.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/29cbdceb Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/29cbdceb Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/29cbdceb Branch: refs/heads/master Commit: 29cbdceb5b78ce86ad0d90050d7542b0d5b45362 Parents: 12abb1b Author: Thomas Groh <tg...@google.com> Authored: Thu Aug 11 10:45:43 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Aug 15 14:16:54 2016 -0700 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 10 + .../beam/sdk/transforms/ParDoLifecycleTest.java | 448 +++++++++++++++++++ .../apache/beam/sdk/transforms/ParDoTest.java | 405 ----------------- 3 files changed, 458 insertions(+), 405 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29cbdceb/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 86991b7..c32e184 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -60,6 +60,16 @@ <beamUseDummyRunner>true</beamUseDummyRunner> </systemPropertyVariables> </configuration> + <executions> + <execution> + <id>runnable-on-service-tests</id> + <configuration> + <excludes> + <exclude>org/apache/beam/sdk/transforms/ParDoLifecycleTest.java</exclude> + </excludes> + </configuration> + </execution> + </executions> </plugin> <!-- Run CheckStyle pass on transforms, as they are release in http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29cbdceb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java new file mode 100644 index 0000000..272fea7 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.transforms; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Tests that {@link ParDo} exercises {@link DoFn} methods in the appropriate sequence. + */ +@RunWith(JUnit4.class) +public class ParDoLifecycleTest { + @Test + @Category(RunnableOnService.class) + public void testOldFnCallSequence() { + TestPipeline p = TestPipeline.create(); + PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) + .and(p.apply("Polite", Create.of(3, 5, 6, 7))) + .apply(Flatten.<Integer>pCollections()) + .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>())); + + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testOldFnCallSequenceMulti() { + TestPipeline p = TestPipeline.create(); + PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) + .and(p.apply("Polite", Create.of(3, 5, 6, 7))) + .apply(Flatten.<Integer>pCollections()) + .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>()) + .withOutputTags(new TupleTag<Integer>() {}, TupleTagList.empty())); + + p.run(); + } + + private static class CallSequenceEnforcingOldFn<T> extends OldDoFn<T, T> { + private boolean setupCalled = false; + private int startBundleCalls = 0; + private int finishBundleCalls = 0; + private boolean teardownCalled = false; + + @Override + public void setup() { + assertThat("setup should not be called twice", setupCalled, is(false)); + assertThat("setup should be called before startBundle", startBundleCalls, equalTo(0)); + assertThat("setup should be called before finishBundle", finishBundleCalls, equalTo(0)); + assertThat("setup should be called before teardown", teardownCalled, is(false)); + setupCalled = true; + } + + @Override + public void startBundle(Context c) { + assertThat("setup should have been called", setupCalled, is(true)); + assertThat( + "Even number of startBundle and finishBundle calls in startBundle", + startBundleCalls, + equalTo(finishBundleCalls)); + assertThat("teardown should not have been called", teardownCalled, is(false)); + startBundleCalls++; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + assertThat("startBundle should have been called", startBundleCalls, greaterThan(0)); + assertThat( + "there should be one startBundle call with no call to finishBundle", + startBundleCalls, + equalTo(finishBundleCalls + 1)); + assertThat("teardown should not have been called", teardownCalled, is(false)); + } + + @Override + public void finishBundle(Context c) { + assertThat("startBundle should have been called", startBundleCalls, greaterThan(0)); + assertThat( + "there should be one bundle that has been started but not finished", + startBundleCalls, + equalTo(finishBundleCalls + 1)); + assertThat("teardown should not have been called", teardownCalled, is(false)); + finishBundleCalls++; + } + + @Override + public void teardown() { + assertThat(setupCalled, is(true)); + assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls))); + assertThat(teardownCalled, is(false)); + teardownCalled = true; + } + } + + @Test + @Category(RunnableOnService.class) + public void testFnCallSequence() { + TestPipeline p = TestPipeline.create(); + PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) + .and(p.apply("Polite", Create.of(3, 5, 6, 7))) + .apply(Flatten.<Integer>pCollections()) + .apply(ParDo.of(new CallSequenceEnforcingFn<Integer>())); + + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testFnCallSequenceMulti() { + TestPipeline p = TestPipeline.create(); + PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) + .and(p.apply("Polite", Create.of(3, 5, 6, 7))) + .apply(Flatten.<Integer>pCollections()) + .apply(ParDo.of(new CallSequenceEnforcingFn<Integer>()) + .withOutputTags(new TupleTag<Integer>() { + }, TupleTagList.empty())); + + p.run(); + } + + private static class CallSequenceEnforcingFn<T> extends DoFn<T, T> { + private boolean setupCalled = false; + private int startBundleCalls = 0; + private int finishBundleCalls = 0; + private boolean teardownCalled = false; + + @Setup + public void before() { + assertThat("setup should not be called twice", setupCalled, is(false)); + assertThat("setup should be called before startBundle", startBundleCalls, equalTo(0)); + assertThat("setup should be called before finishBundle", finishBundleCalls, equalTo(0)); + assertThat("setup should be called before teardown", teardownCalled, is(false)); + setupCalled = true; + } + + @StartBundle + public void begin(Context c) { + assertThat("setup should have been called", setupCalled, is(true)); + assertThat("Even number of startBundle and finishBundle calls in startBundle", + startBundleCalls, + equalTo(finishBundleCalls)); + assertThat("teardown should not have been called", teardownCalled, is(false)); + startBundleCalls++; + } + + @ProcessElement + public void process(ProcessContext c) throws Exception { + assertThat("startBundle should have been called", startBundleCalls, greaterThan(0)); + assertThat("there should be one startBundle call with no call to finishBundle", + startBundleCalls, + equalTo(finishBundleCalls + 1)); + assertThat("teardown should not have been called", teardownCalled, is(false)); + } + + @FinishBundle + public void end(Context c) { + assertThat("startBundle should have been called", startBundleCalls, greaterThan(0)); + assertThat("there should be one bundle that has been started but not finished", + startBundleCalls, + equalTo(finishBundleCalls + 1)); + assertThat("teardown should not have been called", teardownCalled, is(false)); + finishBundleCalls++; + } + + @Teardown + public void after() { + assertThat(setupCalled, is(true)); + assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls))); + assertThat(teardownCalled, is(false)); + teardownCalled = true; + } + } + + @Test + @Category(NeedsRunner.class) + public void testTeardownCalledAfterExceptionInSetup() { + TestPipeline p = TestPipeline.create(); + ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP); + p + .apply(Create.of(1, 2, 3)) + .apply(ParDo.of(fn)); + try { + p.run(); + fail("Pipeline should have failed with an exception"); + } catch (Exception e) { + assertThat( + "Function should have been torn down after exception", + ExceptionThrowingOldFn.teardownCalled.get(), + is(true)); + } + } + + + + @Test + @Category(NeedsRunner.class) + public void testTeardownCalledAfterExceptionInStartBundle() { + TestPipeline p = TestPipeline.create(); + ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE); + p + .apply(Create.of(1, 2, 3)) + .apply(ParDo.of(fn)); + try { + p.run(); + fail("Pipeline should have failed with an exception"); + } catch (Exception e) { + assertThat( + "Function should have been torn down after exception", + ExceptionThrowingOldFn.teardownCalled.get(), + is(true)); + } + } + + @Test + @Category(NeedsRunner.class) + public void testTeardownCalledAfterExceptionInProcessElement() { + TestPipeline p = TestPipeline.create(); + ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT); + p + .apply(Create.of(1, 2, 3)) + .apply(ParDo.of(fn)); + try { + p.run(); + fail("Pipeline should have failed with an exception"); + } catch (Exception e) { + assertThat( + "Function should have been torn down after exception", + ExceptionThrowingOldFn.teardownCalled.get(), + is(true)); + } + } + + @Test + @Category(NeedsRunner.class) + public void testTeardownCalledAfterExceptionInFinishBundle() { + TestPipeline p = TestPipeline.create(); + ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE); + p + .apply(Create.of(1, 2, 3)) + .apply(ParDo.of(fn)); + try { + p.run(); + fail("Pipeline should have failed with an exception"); + } catch (Exception e) { + assertThat( + "Function should have been torn down after exception", + ExceptionThrowingOldFn.teardownCalled.get(), + is(true)); + } + } + + @Test + @Category(NeedsRunner.class) + public void testWithContextTeardownCalledAfterExceptionInSetup() { + TestPipeline p = TestPipeline.create(); + ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP); + p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn)); + try { + p.run(); + fail("Pipeline should have failed with an exception"); + } catch (Exception e) { + assertThat("Function should have been torn down after exception", + ExceptionThrowingOldFn.teardownCalled.get(), + is(true)); + } + } + + @Test + @Category(NeedsRunner.class) + public void testWithContextTeardownCalledAfterExceptionInStartBundle() { + TestPipeline p = TestPipeline.create(); + ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE); + p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn)); + try { + p.run(); + fail("Pipeline should have failed with an exception"); + } catch (Exception e) { + assertThat("Function should have been torn down after exception", + ExceptionThrowingOldFn.teardownCalled.get(), + is(true)); + } + } + + @Test + @Category(NeedsRunner.class) + public void testWithContextTeardownCalledAfterExceptionInProcessElement() { + TestPipeline p = TestPipeline.create(); + ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT); + p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn)); + try { + p.run(); + fail("Pipeline should have failed with an exception"); + } catch (Exception e) { + assertThat("Function should have been torn down after exception", + ExceptionThrowingOldFn.teardownCalled.get(), + is(true)); + } + } + + @Test + @Category(NeedsRunner.class) + public void testWithContextTeardownCalledAfterExceptionInFinishBundle() { + TestPipeline p = TestPipeline.create(); + ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE); + p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn)); + try { + p.run(); + fail("Pipeline should have failed with an exception"); + } catch (Exception e) { + assertThat("Function should have been torn down after exception", + ExceptionThrowingOldFn.teardownCalled.get(), + is(true)); + } + } + + private static class ExceptionThrowingOldFn extends OldDoFn<Object, Object> { + static AtomicBoolean teardownCalled = new AtomicBoolean(false); + + private final MethodForException toThrow; + private boolean thrown; + + private ExceptionThrowingOldFn(MethodForException toThrow) { + this.toThrow = toThrow; + } + + @Override + public void setup() throws Exception { + throwIfNecessary(MethodForException.SETUP); + } + + @Override + public void startBundle(Context c) throws Exception { + throwIfNecessary(MethodForException.START_BUNDLE); + } + + @Override + public void processElement(ProcessContext c) throws Exception { + throwIfNecessary(MethodForException.PROCESS_ELEMENT); + } + + @Override + public void finishBundle(Context c) throws Exception { + throwIfNecessary(MethodForException.FINISH_BUNDLE); + } + + private void throwIfNecessary(MethodForException method) throws Exception { + if (toThrow == method && !thrown) { + thrown = true; + throw new Exception("Hasn't yet thrown"); + } + } + + @Override + public void teardown() { + if (!thrown) { + fail("Excepted to have a processing method throw an exception"); + } + teardownCalled.set(true); + } + } + + + private static class ExceptionThrowingFn extends DoFn<Object, Object> { + static AtomicBoolean teardownCalled = new AtomicBoolean(false); + + private final MethodForException toThrow; + private boolean thrown; + + private ExceptionThrowingFn(MethodForException toThrow) { + this.toThrow = toThrow; + } + + @Setup + public void before() throws Exception { + throwIfNecessary(MethodForException.SETUP); + } + + @StartBundle + public void preBundle(Context c) throws Exception { + throwIfNecessary(MethodForException.START_BUNDLE); + } + + @ProcessElement + public void perElement(ProcessContext c) throws Exception { + throwIfNecessary(MethodForException.PROCESS_ELEMENT); + } + + @FinishBundle + public void postBundle(Context c) throws Exception { + throwIfNecessary(MethodForException.FINISH_BUNDLE); + } + + private void throwIfNecessary(MethodForException method) throws Exception { + if (toThrow == method && !thrown) { + thrown = true; + throw new Exception("Hasn't yet thrown"); + } + } + + @Teardown + public void after() { + if (!thrown) { + fail("Excepted to have a processing method throw an exception"); + } + teardownCalled.set(true); + } + } + + private enum MethodForException { + SETUP, + START_BUNDLE, + PROCESS_ELEMENT, + FINISH_BUNDLE + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29cbdceb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 8460124..c384114 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -28,14 +28,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.is; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; @@ -54,7 +51,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; @@ -80,7 +76,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; /** * Tests for ParDo. @@ -1475,404 +1470,4 @@ public class ParDoTest implements Serializable { assertThat(displayData, includesDisplayDataFrom(fn)); assertThat(displayData, hasDisplayItem("fn", fn.getClass())); } - - @Test - @Category(RunnableOnService.class) - public void testFnCallSequence() { - TestPipeline p = TestPipeline.create(); - PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) - .and(p.apply("Polite", Create.of(3, 5, 6, 7))) - .apply(Flatten.<Integer>pCollections()) - .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>())); - - p.run(); - } - - @Test - @Category(RunnableOnService.class) - public void testFnCallSequenceMulti() { - TestPipeline p = TestPipeline.create(); - PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) - .and(p.apply("Polite", Create.of(3, 5, 6, 7))) - .apply(Flatten.<Integer>pCollections()) - .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>()) - .withOutputTags(new TupleTag<Integer>() {}, TupleTagList.empty())); - - p.run(); - } - - private static class CallSequenceEnforcingOldFn<T> extends OldDoFn<T, T> { - private boolean setupCalled = false; - private int startBundleCalls = 0; - private int finishBundleCalls = 0; - private boolean teardownCalled = false; - - @Override - public void setup() { - assertThat("setup should not be called twice", setupCalled, is(false)); - assertThat("setup should be called before startBundle", startBundleCalls, equalTo(0)); - assertThat("setup should be called before finishBundle", finishBundleCalls, equalTo(0)); - assertThat("setup should be called before teardown", teardownCalled, is(false)); - setupCalled = true; - } - - @Override - public void startBundle(Context c) { - assertThat("setup should have been called", setupCalled, is(true)); - assertThat( - "Even number of startBundle and finishBundle calls in startBundle", - startBundleCalls, - equalTo(finishBundleCalls)); - assertThat("teardown should not have been called", teardownCalled, is(false)); - startBundleCalls++; - } - - @Override - public void processElement(ProcessContext c) throws Exception { - assertThat("startBundle should have been called", startBundleCalls, greaterThan(0)); - assertThat( - "there should be one startBundle call with no call to finishBundle", - startBundleCalls, - equalTo(finishBundleCalls + 1)); - assertThat("teardown should not have been called", teardownCalled, is(false)); - } - - @Override - public void finishBundle(Context c) { - assertThat("startBundle should have been called", startBundleCalls, greaterThan(0)); - assertThat( - "there should be one bundle that has been started but not finished", - startBundleCalls, - equalTo(finishBundleCalls + 1)); - assertThat("teardown should not have been called", teardownCalled, is(false)); - finishBundleCalls++; - } - - @Override - public void teardown() { - assertThat(setupCalled, is(true)); - assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls))); - assertThat(teardownCalled, is(false)); - teardownCalled = true; - } - } - - @Test - @Category(RunnableOnService.class) - public void testFnWithContextCallSequence() { - TestPipeline p = TestPipeline.create(); - PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) - .and(p.apply("Polite", Create.of(3, 5, 6, 7))) - .apply(Flatten.<Integer>pCollections()) - .apply(ParDo.of(new CallSequenceEnforcingFn<Integer>())); - - p.run(); - } - - @Test - @Category(RunnableOnService.class) - public void testFnWithContextCallSequenceMulti() { - TestPipeline p = TestPipeline.create(); - PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) - .and(p.apply("Polite", Create.of(3, 5, 6, 7))) - .apply(Flatten.<Integer>pCollections()) - .apply(ParDo.of(new CallSequenceEnforcingFn<Integer>()) - .withOutputTags(new TupleTag<Integer>() { - }, TupleTagList.empty())); - - p.run(); - } - - private static class CallSequenceEnforcingFn<T> extends DoFn<T, T> { - private boolean setupCalled = false; - private int startBundleCalls = 0; - private int finishBundleCalls = 0; - private boolean teardownCalled = false; - - @Setup - public void before() { - assertThat("setup should not be called twice", setupCalled, is(false)); - assertThat("setup should be called before startBundle", startBundleCalls, equalTo(0)); - assertThat("setup should be called before finishBundle", finishBundleCalls, equalTo(0)); - assertThat("setup should be called before teardown", teardownCalled, is(false)); - setupCalled = true; - } - - @StartBundle - public void begin(Context c) { - assertThat("setup should have been called", setupCalled, is(true)); - assertThat("Even number of startBundle and finishBundle calls in startBundle", - startBundleCalls, - equalTo(finishBundleCalls)); - assertThat("teardown should not have been called", teardownCalled, is(false)); - startBundleCalls++; - } - - @ProcessElement - public void process(ProcessContext c) throws Exception { - assertThat("startBundle should have been called", startBundleCalls, greaterThan(0)); - assertThat("there should be one startBundle call with no call to finishBundle", - startBundleCalls, - equalTo(finishBundleCalls + 1)); - assertThat("teardown should not have been called", teardownCalled, is(false)); - } - - @FinishBundle - public void end(Context c) { - assertThat("startBundle should have been called", startBundleCalls, greaterThan(0)); - assertThat("there should be one bundle that has been started but not finished", - startBundleCalls, - equalTo(finishBundleCalls + 1)); - assertThat("teardown should not have been called", teardownCalled, is(false)); - finishBundleCalls++; - } - - @Teardown - public void after() { - assertThat(setupCalled, is(true)); - assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls))); - assertThat(teardownCalled, is(false)); - teardownCalled = true; - } - } - - @Test - @Category(NeedsRunner.class) - public void testTeardownCalledAfterExceptionInSetup() { - TestPipeline p = TestPipeline.create(); - ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP); - p - .apply(Create.of(1, 2, 3)) - .apply(ParDo.of(fn)); - try { - p.run(); - fail("Pipeline should have failed with an exception"); - } catch (Exception e) { - assertThat( - "Function should have been torn down after exception", - ExceptionThrowingOldFn.teardownCalled.get(), - is(true)); - } - } - - @Test - @Category(NeedsRunner.class) - public void testTeardownCalledAfterExceptionInStartBundle() { - TestPipeline p = TestPipeline.create(); - ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE); - p - .apply(Create.of(1, 2, 3)) - .apply(ParDo.of(fn)); - try { - p.run(); - fail("Pipeline should have failed with an exception"); - } catch (Exception e) { - assertThat( - "Function should have been torn down after exception", - ExceptionThrowingOldFn.teardownCalled.get(), - is(true)); - } - } - - @Test - @Category(NeedsRunner.class) - public void testTeardownCalledAfterExceptionInProcessElement() { - TestPipeline p = TestPipeline.create(); - ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT); - p - .apply(Create.of(1, 2, 3)) - .apply(ParDo.of(fn)); - try { - p.run(); - fail("Pipeline should have failed with an exception"); - } catch (Exception e) { - assertThat( - "Function should have been torn down after exception", - ExceptionThrowingOldFn.teardownCalled.get(), - is(true)); - } - } - - @Test - @Category(NeedsRunner.class) - public void testTeardownCalledAfterExceptionInFinishBundle() { - TestPipeline p = TestPipeline.create(); - ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE); - p - .apply(Create.of(1, 2, 3)) - .apply(ParDo.of(fn)); - try { - p.run(); - fail("Pipeline should have failed with an exception"); - } catch (Exception e) { - assertThat( - "Function should have been torn down after exception", - ExceptionThrowingOldFn.teardownCalled.get(), - is(true)); - } - } - - @Test - @Category(NeedsRunner.class) - public void testWithContextTeardownCalledAfterExceptionInSetup() { - TestPipeline p = TestPipeline.create(); - ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP); - p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn)); - try { - p.run(); - fail("Pipeline should have failed with an exception"); - } catch (Exception e) { - assertThat("Function should have been torn down after exception", - ExceptionThrowingOldFn.teardownCalled.get(), - is(true)); - } - } - - @Test - @Category(NeedsRunner.class) - public void testWithContextTeardownCalledAfterExceptionInStartBundle() { - TestPipeline p = TestPipeline.create(); - ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE); - p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn)); - try { - p.run(); - fail("Pipeline should have failed with an exception"); - } catch (Exception e) { - assertThat("Function should have been torn down after exception", - ExceptionThrowingOldFn.teardownCalled.get(), - is(true)); - } - } - - @Test - @Category(NeedsRunner.class) - public void testWithContextTeardownCalledAfterExceptionInProcessElement() { - TestPipeline p = TestPipeline.create(); - ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT); - p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn)); - try { - p.run(); - fail("Pipeline should have failed with an exception"); - } catch (Exception e) { - assertThat("Function should have been torn down after exception", - ExceptionThrowingOldFn.teardownCalled.get(), - is(true)); - } - } - - @Test - @Category(NeedsRunner.class) - public void testWithContextTeardownCalledAfterExceptionInFinishBundle() { - TestPipeline p = TestPipeline.create(); - ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE); - p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn)); - try { - p.run(); - fail("Pipeline should have failed with an exception"); - } catch (Exception e) { - assertThat("Function should have been torn down after exception", - ExceptionThrowingOldFn.teardownCalled.get(), - is(true)); - } - } - - private static class ExceptionThrowingOldFn extends OldDoFn<Object, Object> { - static AtomicBoolean teardownCalled = new AtomicBoolean(false); - - private final MethodForException toThrow; - private boolean thrown; - - private ExceptionThrowingOldFn(MethodForException toThrow) { - this.toThrow = toThrow; - } - - @Override - public void setup() throws Exception { - throwIfNecessary(MethodForException.SETUP); - } - - @Override - public void startBundle(Context c) throws Exception { - throwIfNecessary(MethodForException.START_BUNDLE); - } - - @Override - public void processElement(ProcessContext c) throws Exception { - throwIfNecessary(MethodForException.PROCESS_ELEMENT); - } - - @Override - public void finishBundle(Context c) throws Exception { - throwIfNecessary(MethodForException.FINISH_BUNDLE); - } - - private void throwIfNecessary(MethodForException method) throws Exception { - if (toThrow == method && !thrown) { - thrown = true; - throw new Exception("Hasn't yet thrown"); - } - } - - @Override - public void teardown() { - if (!thrown) { - fail("Excepted to have a processing method throw an exception"); - } - teardownCalled.set(true); - } - } - - - private static class ExceptionThrowingFn extends DoFn<Object, Object> { - static AtomicBoolean teardownCalled = new AtomicBoolean(false); - - private final MethodForException toThrow; - private boolean thrown; - - private ExceptionThrowingFn(MethodForException toThrow) { - this.toThrow = toThrow; - } - - @Setup - public void before() throws Exception { - throwIfNecessary(MethodForException.SETUP); - } - - @StartBundle - public void preBundle(Context c) throws Exception { - throwIfNecessary(MethodForException.START_BUNDLE); - } - - @ProcessElement - public void perElement(ProcessContext c) throws Exception { - throwIfNecessary(MethodForException.PROCESS_ELEMENT); - } - - @FinishBundle - public void postBundle(Context c) throws Exception { - throwIfNecessary(MethodForException.FINISH_BUNDLE); - } - - private void throwIfNecessary(MethodForException method) throws Exception { - if (toThrow == method && !thrown) { - thrown = true; - throw new Exception("Hasn't yet thrown"); - } - } - - @Teardown - public void after() { - if (!thrown) { - fail("Excepted to have a processing method throw an exception"); - } - teardownCalled.set(true); - } - } - - private enum MethodForException { - SETUP, - START_BUNDLE, - PROCESS_ELEMENT, - FINISH_BUNDLE - } }