dmvk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1185081996
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -473,26 +500,50 @@ public CompletableFuture<Acknowledge> cancel(Time
timeout) {
@Override
public CompletableFuture<Acknowledge> updateTaskExecutionState(
final TaskExecutionState taskExecutionState) {
- FlinkException taskExecutionException;
+ checkNotNull(taskExecutionState, "taskExecutionState");
+ // Use the main/caller thread for all updates to make sure they are
processed in order.
+ // (MainThreadExecutor i.e., the akka thread pool does not guarantee
that)
+ // Only detach for a FAILED state update that is terminal and may
perform io heavy labeling.
+ if
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+ return labelFailure(taskExecutionState)
+ .thenApplyAsync(
+ taskStateWithLabels -> {
+ try {
+ return
doUpdateTaskExecutionState(taskStateWithLabels);
+ } catch (FlinkException e) {
+ throw new CompletionException(e);
+ }
+ },
+ getMainThreadExecutor());
+ }
try {
- checkNotNull(taskExecutionState, "taskExecutionState");
+ return CompletableFuture.completedFuture(
+ doUpdateTaskExecutionState(taskExecutionState));
+ } catch (FlinkException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
+ private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState
taskExecutionState)
+ throws FlinkException {
+ @Nullable FlinkException taskExecutionException;
+ try {
if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
Review Comment:
That's a good catch; the most straightforward way to address this would be
to change the listener to go through JobMaster#updateTaskExecutionState instead
so we have a shared (semi-asynchronous) code path for all execution state
transitions.
`ExecutionDeploymentReconciliationHandler#onMissingDeploymentsOf` seems to
have the same problem (and the fix could most likely be along the same lines).
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]