Repository: beam Updated Branches: refs/heads/master de36e8398 -> cc8e0b9df
Test waitUntilFinish(Duration) in the DirectRunner Ensures that the call to "waitUntilFinish(Duration)" terminates before the Pipeline completes. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b4541a18 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b4541a18 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b4541a18 Branch: refs/heads/master Commit: b4541a18cf447fed2b2150a99be1d892e1f8e358 Parents: de36e83 Author: Thomas Groh <tg...@google.com> Authored: Mon Apr 3 10:12:58 2017 -0700 Committer: Ismaël MejÃa <ieme...@apache.org> Committed: Tue Apr 4 09:45:19 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunnerTest.java | 27 ++++++++++++++++++++ 1 file changed, 27 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b4541a18/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index e601fcf..f1c0eb2 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -277,6 +278,32 @@ public class DirectRunnerTest implements Serializable { } @Test + public void waitUntilFinishTimeout() throws Exception { + DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class); + options.setBlockOnRun(false); + options.setRunner(DirectRunner.class); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(1L)) + .apply( + ParDo.of( + new DoFn<Long, Long>() { + @ProcessElement + public void hang(ProcessContext context) throws InterruptedException { + // Hangs "forever" + Thread.sleep(Long.MAX_VALUE); + } + })); + PipelineResult result = p.run(); + // The pipeline should never complete; + assertThat(result.getState(), is(State.RUNNING)); + // Must time out, otherwise this test will never complete + result.waitUntilFinish(Duration.millis(1L)); + assertThat(result.getState(), is(State.RUNNING)); + + result.cancel(); + } + + @Test public void transformDisplayDataExceptionShouldFail() { DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() { @ProcessElement