[ 
https://issues.apache.org/jira/browse/BEAM-10656?focusedWorklogId=468109&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-468109
 ]

ASF GitHub Bot logged work on BEAM-10656:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Aug/20 23:18
            Start Date: 07/Aug/20 23:18
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467324258



##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1499,6 +1504,146 @@ public void populateDisplayData(Builder builder) {
     }
   }
 
+  @RunWith(JUnit4.class)
+  public static class BundleFinalizationTests extends SharedTestBase 
implements Serializable {
+    private abstract static class BundleFinalizingDoFn extends DoFn<KV<String, 
Long>, String> {
+      private static final long MAX_ATTEMPTS = 3000;
+      // We use the UUID to uniquely identify this DoFn in case this test is 
run with
+      // other tests in the same JVM.
+      private static final Map<UUID, AtomicBoolean> WAS_FINALIZED = new 
HashMap();
+      private final UUID uuid = UUID.randomUUID();
+
+      public void testFinalization(BundleFinalizer bundleFinalizer, 
OutputReceiver<String> output)
+          throws Exception {
+        if (WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new 
AtomicBoolean()).get()) {
+          output.output("bundle was finalized");
+          return;
+        }
+        bundleFinalizer.afterBundleCommit(
+            Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)),
+            () -> WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new 
AtomicBoolean()).set(true));
+        // We sleep here to give time for the runner to perform any prior 
callbacks.
+        sleep(100L);
+      }
+    }
+
+    private static class BasicBundleFinalizingDoFn extends 
BundleFinalizingDoFn {
+      @ProcessElement
+      public void processElement(BundleFinalizer bundleFinalizer, 
OutputReceiver<String> output)
+          throws Exception {
+        testFinalization(bundleFinalizer, output);
+      }
+    }
+
+    private static class BundleFinalizerOutputChecker
+        implements SerializableFunction<Iterable<String>, Void> {
+      @Override
+      public Void apply(Iterable<String> input) {
+        assertTrue(
+            "Expected to have received one callback enabling output to be 
produced but received none.",

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 468109)
    Time Spent: 3h 10m  (was: 3h)

> Java Direct runner supports Bundle Finalization
> -----------------------------------------------
>
>                 Key: BEAM-10656
>                 URL: https://issues.apache.org/jira/browse/BEAM-10656
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-direct
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: P1
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Add support for the direct runner to invoke bundle finalization callbacks 
> within DoFns.
> More details about the semantics within: 
> https://s.apache.org/beam-finalizing-bundles



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to