tillrohrmann commented on a change in pull request #15251:
URL: https://github.com/apache/flink/pull/15251#discussion_r597594777
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -713,58 +698,97 @@ public void goToFinished(ArchivedExecutionGraph
archivedExecutionGraph) {
@Override
public void goToCreatingExecutionGraph() {
- CompletableFuture<ExecutionGraph> executionGraphFuture;
+ final
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+ executionGraphWithAvailableResourcesFuture =
+ createExecutionGraphWithAvailableResourcesAsync();
+
+ transitionToState(
+ new CreatingExecutionGraph.Factory(
+ this, executionGraphWithAvailableResourcesFuture,
LOG));
+ }
+
+ private
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+ createExecutionGraphWithAvailableResourcesAsync() {
+ final JobGraph adjustedJobGraph;
+ final VertexParallelism vertexParallelism;
try {
- final ExecutionGraph executionGraph =
createExecutionGraphWithAvailableResources();
- executionGraphFuture =
CompletableFuture.completedFuture(executionGraph);
+ vertexParallelism = determineParallelism(slotAllocator);
+
+ adjustedJobGraph = jobInformation.copyJobGraph();
+ for (JobVertex vertex : adjustedJobGraph.getVertices()) {
+
vertex.setParallelism(vertexParallelism.getParallelism(vertex.getID()));
+ }
} catch (Exception exception) {
- executionGraphFuture =
FutureUtils.completedExceptionally(exception);
+ return FutureUtils.completedExceptionally(exception);
}
- transitionToState(new CreatingExecutionGraph.Factory(this,
executionGraphFuture, LOG));
+ return createExecutionGraphAndRestoreStateAsync(adjustedJobGraph)
+ .thenApply(
+ executionGraph ->
+
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+ executionGraph, vertexParallelism));
}
- ExecutionGraph createExecutionGraphWithAvailableResources() throws
Exception {
- final ParallelismAndResourceAssignments
parallelismAndResourceAssignments =
- determineParallelismAndAssignResources(slotAllocator);
-
- JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
- for (JobVertex vertex : adjustedJobGraph.getVertices()) {
-
vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID()));
- }
-
+ @Override
+ public CreatingExecutionGraph.AssignmentResult tryToAssignSlots(
+ CreatingExecutionGraph.ExecutionGraphWithVertexParallelism
+ executionGraphWithVertexParallelism) {
final ExecutionGraph executionGraph =
-
createExecutionGraphAndRestoreStateAsync(adjustedJobGraph).join();
+ executionGraphWithVertexParallelism.getExecutionGraph();
executionGraph.start(componentMainThreadExecutor);
executionGraph.transitionToRunning();
executionGraph.setInternalTaskFailuresListener(
new UpdateSchedulerNgOnInternalFailuresListener(this));
+ final VertexParallelism vertexParallelism =
+ executionGraphWithVertexParallelism.getVertexParallelism();
+ return slotAllocator
+ .tryReserveResources(vertexParallelism)
+ .map(
+ reservedSlots ->
+
CreatingExecutionGraph.AssignmentResult.success(
+
assignSlotsToExecutionGraph(executionGraph, reservedSlots)))
+
.orElseGet(CreatingExecutionGraph.AssignmentResult::notPossible);
+ }
+
+ @Nonnull
+ private ExecutionGraph assignSlotsToExecutionGraph(
+ ExecutionGraph executionGraph,
+
org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots
assignedSlots) {
Review comment:
Indeed. I will fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]