Demonstrate that the DirectRunner runs per-call Add a field that is modified per output, which should occur twice.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/65f9076d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/65f9076d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/65f9076d Branch: refs/heads/gearpump-runner Commit: 65f9076d654be02cbdc07442d008f6c5245d1ab5 Parents: 6da92ad Author: Thomas Groh <[email protected]> Authored: Wed Aug 10 11:29:38 2016 -0700 Committer: bchambers <[email protected]> Committed: Wed Aug 10 15:54:09 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/direct/DirectRunnerTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/65f9076d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 1e73ec0..ddce458 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -32,6 +33,7 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.OldDoFn; @@ -58,6 +60,7 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * Tests for basic {@link DirectRunner} functionality. @@ -109,10 +112,12 @@ public class DirectRunnerTest implements Serializable { result.awaitCompletion(); } + private static AtomicInteger changed; @Test public void reusePipelineSucceeds() throws Throwable { Pipeline p = getPipeline(); + changed = new AtomicInteger(0); PCollection<KV<String, Long>> counts = p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo")) .apply(MapElements.via(new SimpleFunction<String, String>() { @@ -131,6 +136,14 @@ public class DirectRunnerTest implements Serializable { } })); + counts.apply(ParDo.of(new DoFn<KV<String, Long>, Void>() { + @ProcessElement + public void updateChanged(ProcessContext c) { + changed.getAndIncrement(); + } + })); + + PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3"); DirectPipelineResult result = ((DirectPipelineResult) p.run()); @@ -138,6 +151,8 @@ public class DirectRunnerTest implements Serializable { DirectPipelineResult otherResult = ((DirectPipelineResult) p.run()); otherResult.awaitCompletion(); + + assertThat("Each element should have been processed twice", changed.get(), equalTo(6)); } @Test(timeout = 5000L)
