lukecwik commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467358540
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
##########
@@ -839,19 +845,20 @@ public ProcessContinuation process(
RestrictionTracker<OffsetRange, Long> tracker,
BundleFinalizer bundleFinalizer)
throws InterruptedException {
- if (wasFinalized.get()) {
+ if (WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new
AtomicBoolean()).get()) {
+ tracker.tryClaim(tracker.currentRestriction().getFrom() + 1);
+ receiver.output(element);
// Claim beyond the end now that we know we have been finalized.
tracker.tryClaim(Long.MAX_VALUE);
- receiver.output(element);
return stop();
}
if (tracker.tryClaim(tracker.currentRestriction().getFrom() + 1)) {
bundleFinalizer.afterBundleCommit(
Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)),
- () -> wasFinalized.set(true));
+ () -> WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new
AtomicBoolean()).set(true));
// We sleep here instead of setting a resume time since the resume
time doesn't need to
// be honored.
- sleep(1000L); // 1 second
+ sleep(100L);
Review comment:
We also have an effectively 300 second timeout here so I don't believe
it should flake but if it does I'll have to figure out a different way to
design this test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]