Repository: beam
Updated Branches:
  refs/heads/master 5f8cfa741 -> b3c36256e


Allow output from FinishBundle in DoFnTester


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/649994b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/649994b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/649994b3

Branch: refs/heads/master
Commit: 649994b353afe28c917969609c7a1a47a4f39aaf
Parents: 5f8cfa7
Author: Rune Fevang <[email protected]>
Authored: Thu Jun 15 13:51:12 2017 +0200
Committer: Kenneth Knowles <[email protected]>
Committed: Thu Jun 22 20:07:11 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  | 16 ++--------
 .../beam/sdk/transforms/DoFnTesterTest.java     | 32 ++++++++++++++++++++
 2 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/649994b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 8a03f3c..4da9a80 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -546,11 +546,6 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
       fn.super();
     }
 
-    private void throwUnsupportedOutputFromBundleMethods() {
-      throw new UnsupportedOperationException(
-          "DoFnTester doesn't support output from bundle methods");
-    }
-
     @Override
     public PipelineOptions getPipelineOptions() {
       return options;
@@ -559,12 +554,13 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
     @Override
     public void output(
         OutputT output, Instant timestamp, BoundedWindow window) {
-      throwUnsupportedOutputFromBundleMethods();
+      output(mainOutputTag, output, timestamp, window);
     }
 
     @Override
     public <T> void output(TupleTag<T> tag, T output, Instant timestamp, 
BoundedWindow window) {
-      throwUnsupportedOutputFromBundleMethods();
+      getMutableOutput(tag)
+          .add(ValueInSingleWindow.of(output, timestamp, window, 
PaneInfo.NO_FIRING));
     }
   }
 
@@ -642,12 +638,6 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
       getMutableOutput(tag)
           .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), 
element.getPane()));
     }
-
-    private void throwUnsupportedOutputFromBundleMethods() {
-      throw new UnsupportedOperationException(
-          "DoFnTester doesn't support output from bundle methods");
-    }
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/649994b3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 1bb71bb..5cb9e18 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -360,6 +360,38 @@ public class DoFnTesterTest {
     }
   }
 
+  @Test
+  public void testSupportsFinishBundleOutput() throws Exception {
+    for (DoFnTester.CloningBehavior cloning : 
DoFnTester.CloningBehavior.values()) {
+      try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new 
BundleCounterDoFn())) {
+        tester.setCloningBehavior(cloning);
+
+        assertThat(tester.processBundle(1, 2, 3, 4), contains(4));
+        assertThat(tester.processBundle(5, 6, 7), contains(3));
+        assertThat(tester.processBundle(8, 9), contains(2));
+      }
+    }
+  }
+
+  private static class BundleCounterDoFn extends DoFn<Integer, Integer> {
+    private int elements;
+
+    @StartBundle
+    public void startBundle() {
+      elements = 0;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      elements++;
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext c) {
+      c.output(elements, Instant.now(), GlobalWindow.INSTANCE);
+    }
+  }
+
   private static class SideInputDoFn extends DoFn<Integer, Integer> {
     private final PCollectionView<Integer> value;
 

Reply via email to