StefanRRichter commented on a change in pull request #7568: [FLINK-11417] Make
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r250978935
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##########
@@ -420,13 +429,23 @@ public void setInitialState(@Nullable
JobManagerTaskRestore taskRestore) {
allPreviousExecutionGraphAllocationIds,
allocationTimeout);
- // IMPORTANT: We have to use the synchronous handle
operation (direct executor) here so
- // that we directly deploy the tasks if the slot
allocation future is completed. This is
- // necessary for immediate deployment.
- final CompletableFuture<Void> deploymentFuture =
allocationFuture.thenAccept(
- (FutureConsumerWithException<Execution,
Exception>) value -> deploy());
+ final ComponentMainThreadExecutor mainThreadExecutor =
+
vertex.getExecutionGraph().getJobMasterMainThreadExecutor();
+ FutureConsumerWithException<Execution, Exception>
deployment = value -> deploy();
+ final CompletableFuture<Void> deploymentFuture;
+ if (allocationFuture.isDone()) {
+ deploymentFuture =
allocationFuture.thenAccept(deployment);
+ } else if (queued) {
+ deploymentFuture =
allocationFuture.thenAcceptAsync(
+ deployment, mainThreadExecutor);
+ } else {
+ deploymentFuture =
FutureUtils.completedExceptionally(
+ new IllegalArgumentException("The slot
allocation future has not been completed yet."));
+ }
- deploymentFuture.whenComplete(
+ // Normal completion of this future always happens in
the main thread, but we need to complete async for the
+ // exceptional cases that can complete in a different
thread, e.g. a timeout.
+ deploymentFuture.whenCompleteAsync(
Review comment:
We can complete exceptionally in a different thread when a timeout happens
in the allocation. What would you suggest as the best way to ensure completion
in the main thread, given how `FutureUtils.orTimeout(...)` works? (e.g.
`SlotPool`, L 683)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services