Add DirectRunner Reuse Test Two calls to run using the Direct Runner should be independent and succeed independently.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4546fd9c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4546fd9c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4546fd9c Branch: refs/heads/master Commit: 4546fd9c5e073eb33787faa302b8695dfd6e04aa Parents: 7585cfc Author: Thomas Groh <[email protected]> Authored: Fri Aug 5 09:58:59 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri Aug 5 10:04:21 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunnerTest.java | 31 ++++++++++++++++++++ 1 file changed, 31 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4546fd9c/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 29dea32..1e73ec0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -109,6 +109,37 @@ public class DirectRunnerTest implements Serializable { result.awaitCompletion(); } + @Test + public void reusePipelineSucceeds() throws Throwable { + Pipeline p = getPipeline(); + + PCollection<KV<String, Long>> counts = + p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo")) + .apply(MapElements.via(new SimpleFunction<String, String>() { + @Override + public String apply(String input) { + return input; + } + })) + .apply(Count.<String>perElement()); + PCollection<String> countStrs = + counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() { + @Override + public String apply(KV<String, Long> input) { + String str = String.format("%s: %s", input.getKey(), input.getValue()); + return str; + } + })); + + PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3"); + + DirectPipelineResult result = ((DirectPipelineResult) p.run()); + result.awaitCompletion(); + + DirectPipelineResult otherResult = ((DirectPipelineResult) p.run()); + otherResult.awaitCompletion(); + } + @Test(timeout = 5000L) public void byteArrayCountShouldSucceed() { Pipeline p = getPipeline();
