amaliujia commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467331551
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1499,6 +1504,146 @@ public void populateDisplayData(Builder builder) {
}
}
+ @RunWith(JUnit4.class)
+ public static class BundleFinalizationTests extends SharedTestBase
implements Serializable {
+ private abstract static class BundleFinalizingDoFn extends DoFn<KV<String,
Long>, String> {
+ private static final long MAX_ATTEMPTS = 3000;
+ // We use the UUID to uniquely identify this DoFn in case this test is
run with
+ // other tests in the same JVM.
+ private static final Map<UUID, AtomicBoolean> WAS_FINALIZED = new
HashMap();
+ private final UUID uuid = UUID.randomUUID();
+
+ public void testFinalization(BundleFinalizer bundleFinalizer,
OutputReceiver<String> output)
+ throws Exception {
+ if (WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new
AtomicBoolean()).get()) {
+ output.output("bundle was finalized");
+ return;
+ }
+ bundleFinalizer.afterBundleCommit(
+ Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)),
+ () -> WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new
AtomicBoolean()).set(true));
+ // We sleep here to give time for the runner to perform any prior
callbacks.
+ sleep(100L);
Review comment:
nit: I am not sure if there are other ways, but such `sleep` might be a
source of flakiness. E.g. prior callbacks not finish after 100L.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]