Repository: beam Updated Branches: refs/heads/master 5f8cfa741 -> b3c36256e
Allow output from FinishBundle in DoFnTester Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/649994b3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/649994b3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/649994b3 Branch: refs/heads/master Commit: 649994b353afe28c917969609c7a1a47a4f39aaf Parents: 5f8cfa7 Author: Rune Fevang <[email protected]> Authored: Thu Jun 15 13:51:12 2017 +0200 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 22 20:07:11 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/DoFnTester.java | 16 ++-------- .../beam/sdk/transforms/DoFnTesterTest.java | 32 ++++++++++++++++++++ 2 files changed, 35 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/649994b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 8a03f3c..4da9a80 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -546,11 +546,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { fn.super(); } - private void throwUnsupportedOutputFromBundleMethods() { - throw new UnsupportedOperationException( - "DoFnTester doesn't support output from bundle methods"); - } - @Override public PipelineOptions getPipelineOptions() { return options; @@ -559,12 +554,13 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { @Override public void output( OutputT output, Instant timestamp, BoundedWindow window) { - throwUnsupportedOutputFromBundleMethods(); + output(mainOutputTag, output, timestamp, window); } @Override public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) { - throwUnsupportedOutputFromBundleMethods(); + getMutableOutput(tag) + .add(ValueInSingleWindow.of(output, timestamp, window, PaneInfo.NO_FIRING)); } } @@ -642,12 +638,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { getMutableOutput(tag) .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane())); } - - private void throwUnsupportedOutputFromBundleMethods() { - throw new UnsupportedOperationException( - "DoFnTester doesn't support output from bundle methods"); - } - } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/649994b3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index 1bb71bb..5cb9e18 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -360,6 +360,38 @@ public class DoFnTesterTest { } } + @Test + public void testSupportsFinishBundleOutput() throws Exception { + for (DoFnTester.CloningBehavior cloning : DoFnTester.CloningBehavior.values()) { + try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new BundleCounterDoFn())) { + tester.setCloningBehavior(cloning); + + assertThat(tester.processBundle(1, 2, 3, 4), contains(4)); + assertThat(tester.processBundle(5, 6, 7), contains(3)); + assertThat(tester.processBundle(8, 9), contains(2)); + } + } + } + + private static class BundleCounterDoFn extends DoFn<Integer, Integer> { + private int elements; + + @StartBundle + public void startBundle() { + elements = 0; + } + + @ProcessElement + public void processElement(ProcessContext c) { + elements++; + } + + @FinishBundle + public void finishBundle(FinishBundleContext c) { + c.output(elements, Instant.now(), GlobalWindow.INSTANCE); + } + } + private static class SideInputDoFn extends DoFn<Integer, Integer> { private final PCollectionView<Integer> value;
