lukecwik commented on a change in pull request #11275: [BEAM-9648]:
DirectRunner should return null on timeout
URL: https://github.com/apache/beam/pull/11275#discussion_r405807176
##########
File path:
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
##########
@@ -260,6 +260,11 @@ public State waitUntilFinish(Duration duration) throws
Exception {
}
}
}
+
+ if (Instant.now().isAfter(completionTime)) {
+ return null;
+ }
Review comment:
This allows for a race condition where we exit the while loop above not due
to a timeout but would turn it into a timeout here because of the timing.
We could clean-up the loop above so it only exits on timeout while all
non-timeout returns happen within the loop with something like:
```
while (Instant.now().isBefore(completionTime)) {
// Get an update; don't block forever if another thread has handled
it. The call to poll will
// wait the entire timeout; this call primarily exists to relinquish
any core.
VisibleExecutorUpdate update =
visibleUpdates.tryNext(Duration.millis(25L));
if (pipelineState.get().isTerminal() || (update != null &&
isTerminalStateUpdate(update))) {
// there are no updates to process and no updates will ever be
published because the
// executor is shutdown OR there has been an update and the update
is terminal
return pipelineState.get();
} else if (update != null && update.thrown.isPresent()) {
Throwable thrown = update.thrown.get();
if (thrown instanceof Exception) {
throw (Exception) thrown;
} else if (thrown instanceof Error) {
throw (Error) thrown;
} else {
throw new Exception("Unknown Type of Throwable", thrown);
}
}
}
return null;
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services