StefanRRichter commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r250978935
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##########
 @@ -420,13 +429,23 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
                                allPreviousExecutionGraphAllocationIds,
                                allocationTimeout);
 
-                       // IMPORTANT: We have to use the synchronous handle 
operation (direct executor) here so
-                       // that we directly deploy the tasks if the slot 
allocation future is completed. This is
-                       // necessary for immediate deployment.
-                       final CompletableFuture<Void> deploymentFuture = 
allocationFuture.thenAccept(
-                               (FutureConsumerWithException<Execution, 
Exception>) value -> deploy());
+                       final ComponentMainThreadExecutor mainThreadExecutor =
+                               
vertex.getExecutionGraph().getJobMasterMainThreadExecutor();
+                       FutureConsumerWithException<Execution, Exception> 
deployment = value -> deploy();
+                       final CompletableFuture<Void> deploymentFuture;
+                       if (allocationFuture.isDone()) {
+                               deploymentFuture = 
allocationFuture.thenAccept(deployment);
+                       } else if (queued) {
+                               deploymentFuture = 
allocationFuture.thenAcceptAsync(
+                                       deployment, mainThreadExecutor);
+                       } else {
+                               deploymentFuture = 
FutureUtils.completedExceptionally(
+                                       new IllegalArgumentException("The slot 
allocation future has not been completed yet."));
+                       }
 
-                       deploymentFuture.whenComplete(
+                       // Normal completion of this future always happens in 
the main thread, but we need to complete async for the
+                       // exceptional cases that can complete in a different 
thread, e.g. a timeout.
+                       deploymentFuture.whenCompleteAsync(
 
 Review comment:
   We can complete exceptionally in a different thread when a timeout happens 
in the allocation. What would you suggest as the best way to ensure completion 
in the main thread, given how `FutureUtils.orTimeout(...)` works? (e.g. 
`SlotPool`, L 683)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to