tillrohrmann commented on a change in pull request #15251:
URL: https://github.com/apache/flink/pull/15251#discussion_r597594777



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -713,58 +698,97 @@ public void goToFinished(ArchivedExecutionGraph 
archivedExecutionGraph) {
 
     @Override
     public void goToCreatingExecutionGraph() {
-        CompletableFuture<ExecutionGraph> executionGraphFuture;
+        final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                executionGraphWithAvailableResourcesFuture =
+                        createExecutionGraphWithAvailableResourcesAsync();
+
+        transitionToState(
+                new CreatingExecutionGraph.Factory(
+                        this, executionGraphWithAvailableResourcesFuture, 
LOG));
+    }
+
+    private 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+            createExecutionGraphWithAvailableResourcesAsync() {
+        final JobGraph adjustedJobGraph;
+        final VertexParallelism vertexParallelism;
 
         try {
-            final ExecutionGraph executionGraph = 
createExecutionGraphWithAvailableResources();
-            executionGraphFuture = 
CompletableFuture.completedFuture(executionGraph);
+            vertexParallelism = determineParallelism(slotAllocator);
+
+            adjustedJobGraph = jobInformation.copyJobGraph();
+            for (JobVertex vertex : adjustedJobGraph.getVertices()) {
+                
vertex.setParallelism(vertexParallelism.getParallelism(vertex.getID()));
+            }
         } catch (Exception exception) {
-            executionGraphFuture = 
FutureUtils.completedExceptionally(exception);
+            return FutureUtils.completedExceptionally(exception);
         }
 
-        transitionToState(new CreatingExecutionGraph.Factory(this, 
executionGraphFuture, LOG));
+        return createExecutionGraphAndRestoreStateAsync(adjustedJobGraph)
+                .thenApply(
+                        executionGraph ->
+                                
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+                                        executionGraph, vertexParallelism));
     }
 
-    ExecutionGraph createExecutionGraphWithAvailableResources() throws 
Exception {
-        final ParallelismAndResourceAssignments 
parallelismAndResourceAssignments =
-                determineParallelismAndAssignResources(slotAllocator);
-
-        JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
-        for (JobVertex vertex : adjustedJobGraph.getVertices()) {
-            
vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID()));
-        }
-
+    @Override
+    public CreatingExecutionGraph.AssignmentResult tryToAssignSlots(
+            CreatingExecutionGraph.ExecutionGraphWithVertexParallelism
+                    executionGraphWithVertexParallelism) {
         final ExecutionGraph executionGraph =
-                
createExecutionGraphAndRestoreStateAsync(adjustedJobGraph).join();
+                executionGraphWithVertexParallelism.getExecutionGraph();
 
         executionGraph.start(componentMainThreadExecutor);
         executionGraph.transitionToRunning();
 
         executionGraph.setInternalTaskFailuresListener(
                 new UpdateSchedulerNgOnInternalFailuresListener(this));
 
+        final VertexParallelism vertexParallelism =
+                executionGraphWithVertexParallelism.getVertexParallelism();
+        return slotAllocator
+                .tryReserveResources(vertexParallelism)
+                .map(
+                        reservedSlots ->
+                                
CreatingExecutionGraph.AssignmentResult.success(
+                                        
assignSlotsToExecutionGraph(executionGraph, reservedSlots)))
+                
.orElseGet(CreatingExecutionGraph.AssignmentResult::notPossible);
+    }
+
+    @Nonnull
+    private ExecutionGraph assignSlotsToExecutionGraph(
+            ExecutionGraph executionGraph,
+            
org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots 
assignedSlots) {

Review comment:
       Indeed. I will fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to