tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r250614378
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##########
 @@ -420,13 +429,23 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
                                allPreviousExecutionGraphAllocationIds,
                                allocationTimeout);
 
-                       // IMPORTANT: We have to use the synchronous handle 
operation (direct executor) here so
-                       // that we directly deploy the tasks if the slot 
allocation future is completed. This is
-                       // necessary for immediate deployment.
-                       final CompletableFuture<Void> deploymentFuture = 
allocationFuture.thenAccept(
-                               (FutureConsumerWithException<Execution, 
Exception>) value -> deploy());
+                       final ComponentMainThreadExecutor mainThreadExecutor =
+                               
vertex.getExecutionGraph().getJobMasterMainThreadExecutor();
+                       FutureConsumerWithException<Execution, Exception> 
deployment = value -> deploy();
+                       final CompletableFuture<Void> deploymentFuture;
+                       if (allocationFuture.isDone()) {
+                               deploymentFuture = 
allocationFuture.thenAccept(deployment);
+                       } else if (queued) {
+                               deploymentFuture = 
allocationFuture.thenAcceptAsync(
+                                       deployment, mainThreadExecutor);
+                       } else {
+                               deploymentFuture = 
FutureUtils.completedExceptionally(
+                                       new IllegalArgumentException("The slot 
allocation future has not been completed yet."));
+                       }
 
-                       deploymentFuture.whenComplete(
+                       // Normal completion of this future always happens in 
the main thread, but we need to complete async for the
+                       // exceptional cases that can complete in a different 
thread, e.g. a timeout.
+                       deploymentFuture.whenCompleteAsync(
 
 Review comment:
   From where does the concurrency arise? Is it the `allocationFuture`? If yes, 
can't we guarantee that this allocationFuture is always being completed by the 
main thread executor? Then we would not need to make the case distinction above 
and we could call here `whenComplete`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to