This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f5f9aa5e82e75a104229d551a2d8b443f32885a
Author: Till Rohrmann <[email protected]>
AuthorDate: Tue Mar 16 10:59:56 2021 +0100

    [FLINK-21602] Split ExecutionGraph generation and slot assignments into two 
steps
    
    In order to properly support the asynchronous ExecutionGraph generation, we 
need to handle
    the case where the set of available slots changed after the ExecutionGraph 
has been created.
    In this situation, we need to check whether the expected set of resources 
is still there
    and if not, then go back to WaitingForResources. Moreover, we need to 
schedule a resource
    check when entering the Executing state in order to not miss newly arrived 
slots.
---
 .../scheduler/adaptive/AdaptiveScheduler.java      | 125 ++++++----
 .../scheduler/adaptive/CreatingExecutionGraph.java |  65 ++++-
 .../runtime/scheduler/adaptive/Executing.java      |  15 ++
 .../ParallelismAndResourceAssignments.java         |  49 ----
 .../adaptive/allocator/VertexParallelism.java      |   2 +
 .../VertexParallelismWithSlotSharing.java          |   5 +
 .../jobmaster/slotpool/SlotPoolTestUtils.java      |  12 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 277 ++++++++++++++-------
 .../adaptive/CreatingExecutionGraphTest.java       |  67 +++--
 .../runtime/scheduler/adaptive/ExecutingTest.java  |  27 +-
 .../adaptive/StateTrackingMockExecutionGraph.java  |  16 +-
 .../adaptive/allocator/TestingSlotAllocator.java   | 120 +++++++++
 12 files changed, 549 insertions(+), 231 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 3bac7a0..badad27 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -577,30 +577,16 @@ public class AdaptiveScheduler
         return outstandingResources.isEmpty();
     }
 
-    private ParallelismAndResourceAssignments 
determineParallelismAndAssignResources(
-            SlotAllocator slotAllocator) throws JobExecutionException {
-
-        final VertexParallelism vertexParallelism =
-                slotAllocator
-                        .determineParallelism(
-                                jobInformation, 
declarativeSlotPool.getFreeSlotsInformation())
-                        .orElseThrow(
-                                () ->
-                                        new JobExecutionException(
-                                                jobInformation.getJobID(),
-                                                "Not enough resources 
available for scheduling."));
-
-        final ReservedSlots reservedSlots =
-                slotAllocator
-                        .tryReserveResources(vertexParallelism)
-                        .orElseThrow(
-                                () ->
-                                        new JobExecutionException(
-                                                jobInformation.getJobID(),
-                                                "Could not reserve all 
required slots."));
+    private VertexParallelism determineParallelism(SlotAllocator slotAllocator)
+            throws JobExecutionException {
 
-        return new ParallelismAndResourceAssignments(
-                reservedSlots, 
vertexParallelism.getMaxParallelismForVertices());
+        return slotAllocator
+                .determineParallelism(jobInformation, 
declarativeSlotPool.getFreeSlotsInformation())
+                .orElseThrow(
+                        () ->
+                                new JobExecutionException(
+                                        jobInformation.getJobID(),
+                                        "Not enough resources available for 
scheduling."));
     }
 
     @Override
@@ -713,29 +699,44 @@ public class AdaptiveScheduler
 
     @Override
     public void goToCreatingExecutionGraph() {
-        CompletableFuture<ExecutionGraph> executionGraphFuture;
+        final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                executionGraphWithAvailableResourcesFuture =
+                        createExecutionGraphWithAvailableResourcesAsync();
+
+        transitionToState(
+                new CreatingExecutionGraph.Factory(
+                        this, executionGraphWithAvailableResourcesFuture, 
LOG));
+    }
+
+    private 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+            createExecutionGraphWithAvailableResourcesAsync() {
+        final JobGraph adjustedJobGraph;
+        final VertexParallelism vertexParallelism;
 
         try {
-            final ExecutionGraph executionGraph = 
createExecutionGraphWithAvailableResources();
-            executionGraphFuture = 
CompletableFuture.completedFuture(executionGraph);
+            vertexParallelism = determineParallelism(slotAllocator);
+
+            adjustedJobGraph = jobInformation.copyJobGraph();
+            for (JobVertex vertex : adjustedJobGraph.getVertices()) {
+                
vertex.setParallelism(vertexParallelism.getParallelism(vertex.getID()));
+            }
         } catch (Exception exception) {
-            executionGraphFuture = 
FutureUtils.completedExceptionally(exception);
+            return FutureUtils.completedExceptionally(exception);
         }
 
-        transitionToState(new CreatingExecutionGraph.Factory(this, 
executionGraphFuture, LOG));
+        return createExecutionGraphAndRestoreStateAsync(adjustedJobGraph)
+                .thenApply(
+                        executionGraph ->
+                                
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+                                        executionGraph, vertexParallelism));
     }
 
-    ExecutionGraph createExecutionGraphWithAvailableResources() throws 
Exception {
-        final ParallelismAndResourceAssignments 
parallelismAndResourceAssignments =
-                determineParallelismAndAssignResources(slotAllocator);
-
-        JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
-        for (JobVertex vertex : adjustedJobGraph.getVertices()) {
-            
vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID()));
-        }
-
+    @Override
+    public CreatingExecutionGraph.AssignmentResult tryToAssignSlots(
+            CreatingExecutionGraph.ExecutionGraphWithVertexParallelism
+                    executionGraphWithVertexParallelism) {
         final ExecutionGraph executionGraph =
-                
createExecutionGraphAndRestoreStateAsync(adjustedJobGraph).join();
+                executionGraphWithVertexParallelism.getExecutionGraph();
 
         executionGraph.start(componentMainThreadExecutor);
         executionGraph.transitionToRunning();
@@ -743,28 +744,51 @@ public class AdaptiveScheduler
         executionGraph.setInternalTaskFailuresListener(
                 new UpdateSchedulerNgOnInternalFailuresListener(this));
 
+        final VertexParallelism vertexParallelism =
+                executionGraphWithVertexParallelism.getVertexParallelism();
+        return slotAllocator
+                .tryReserveResources(vertexParallelism)
+                .map(
+                        reservedSlots ->
+                                
CreatingExecutionGraph.AssignmentResult.success(
+                                        
assignSlotsToExecutionGraph(executionGraph, reservedSlots)))
+                
.orElseGet(CreatingExecutionGraph.AssignmentResult::notPossible);
+    }
+
+    @Nonnull
+    private ExecutionGraph assignSlotsToExecutionGraph(
+            ExecutionGraph executionGraph, ReservedSlots reservedSlots) {
         for (ExecutionVertex executionVertex : 
executionGraph.getAllExecutionVertices()) {
-            final LogicalSlot assignedSlot =
-                    
parallelismAndResourceAssignments.getAssignedSlot(executionVertex.getID());
+            final LogicalSlot assignedSlot = 
reservedSlots.getSlotFor(executionVertex.getID());
             executionVertex
                     .getCurrentExecutionAttempt()
                     
.registerProducedPartitions(assignedSlot.getTaskManagerLocation(), false);
             executionVertex.tryAssignResource(assignedSlot);
         }
+
         return executionGraph;
     }
 
     private CompletableFuture<ExecutionGraph> 
createExecutionGraphAndRestoreStateAsync(
             JobGraph adjustedJobGraph) {
         return CompletableFuture.supplyAsync(
-                () -> {
-                    try {
-                        return 
createExecutionGraphAndRestoreState(adjustedJobGraph);
-                    } catch (Exception exception) {
-                        throw new CompletionException(exception);
-                    }
-                },
-                ioExecutor);
+                        () -> {
+                            try {
+                                return 
createExecutionGraphAndRestoreState(adjustedJobGraph);
+                            } catch (Exception exception) {
+                                throw new CompletionException(exception);
+                            }
+                        },
+                        ioExecutor)
+                .handleAsync(
+                        (executionGraph, throwable) -> {
+                            if (throwable != null) {
+                                throw new CompletionException(throwable);
+                            } else {
+                                return executionGraph;
+                            }
+                        },
+                        getMainThreadExecutor());
     }
 
     @Nonnull
@@ -969,11 +993,6 @@ public class AdaptiveScheduler
                 () -> runIfState(expectedState, action), delay.toMillis(), 
TimeUnit.MILLISECONDS);
     }
 
-    @Override
-    public CreatingExecutionGraph.AssignmentResult 
tryToAssignSlots(ExecutionGraph executionGraph) {
-        return CreatingExecutionGraph.AssignmentResult.success(executionGraph);
-    }
-
     // ----------------------------------------------------------------
 
     /** Note: Do not call this method from a State constructor or 
State#onLeave. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
index 5e21ba6..79a265d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -47,23 +48,29 @@ public class CreatingExecutionGraph implements State {
     private final Logger log;
 
     public CreatingExecutionGraph(
-            Context context, CompletableFuture<ExecutionGraph> 
executionGraphFuture, Logger log) {
+            Context context,
+            CompletableFuture<ExecutionGraphWithVertexParallelism>
+                    executionGraphWithParallelismFuture,
+            Logger log) {
         this.context = context;
         this.log = log;
 
         FutureUtils.assertNoException(
-                executionGraphFuture.handle(
-                        (executionGraph, throwable) -> {
+                executionGraphWithParallelismFuture.handle(
+                        (executionGraphWithVertexParallelism, throwable) -> {
                             context.runIfState(
                                     this,
-                                    () -> 
handleExecutionGraphCreation(executionGraph, throwable),
+                                    () ->
+                                            handleExecutionGraphCreation(
+                                                    
executionGraphWithVertexParallelism, throwable),
                                     Duration.ZERO);
                             return null;
                         }));
     }
 
     private void handleExecutionGraphCreation(
-            @Nullable ExecutionGraph executionGraph, @Nullable Throwable 
throwable) {
+            @Nullable ExecutionGraphWithVertexParallelism 
executionGraphWithVertexParallelism,
+            @Nullable Throwable throwable) {
         if (throwable != null) {
             log.info(
                     "Failed to go from {} to {} because the ExecutionGraph 
creation failed.",
@@ -72,11 +79,16 @@ public class CreatingExecutionGraph implements State {
                     throwable);
             
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, 
throwable));
         } else {
-            final AssignmentResult result = 
context.tryToAssignSlots(executionGraph);
+            final AssignmentResult result =
+                    
context.tryToAssignSlots(executionGraphWithVertexParallelism);
 
             if (result.isSuccess()) {
+                log.debug(
+                        "Successfully reserved and assigned the required slots 
for the ExecutionGraph.");
                 context.goToExecuting(result.getExecutionGraph());
             } else {
+                log.debug(
+                        "Failed to reserve and assign the required slots. 
Waiting for new resources.");
                 context.goToWaitingForResources();
             }
         }
@@ -159,10 +171,12 @@ public class CreatingExecutionGraph implements State {
          * method returns a successful {@link AssignmentResult} which contains 
the assigned {@link
          * ExecutionGraph}. If not, then the assignment result is a failure.
          *
-         * @param executionGraph executionGraph to assign slots to
+         * @param executionGraphWithVertexParallelism 
executionGraphWithVertexParallelism to assign
+         *     slots to resources
          * @return {@link AssignmentResult} representing the result of the 
assignment
          */
-        AssignmentResult tryToAssignSlots(ExecutionGraph executionGraph);
+        AssignmentResult tryToAssignSlots(
+                ExecutionGraphWithVertexParallelism 
executionGraphWithVertexParallelism);
     }
 
     /**
@@ -207,16 +221,18 @@ public class CreatingExecutionGraph implements State {
 
         private final Context context;
 
-        private final CompletableFuture<ExecutionGraph> executionGraphFuture;
+        private final CompletableFuture<ExecutionGraphWithVertexParallelism>
+                executionGraphWithParallelismFuture;
 
         private final Logger log;
 
         Factory(
                 Context context,
-                CompletableFuture<ExecutionGraph> executionGraphFuture,
+                CompletableFuture<ExecutionGraphWithVertexParallelism>
+                        executionGraphWithParallelismFuture,
                 Logger log) {
             this.context = context;
-            this.executionGraphFuture = executionGraphFuture;
+            this.executionGraphWithParallelismFuture = 
executionGraphWithParallelismFuture;
             this.log = log;
         }
 
@@ -227,7 +243,32 @@ public class CreatingExecutionGraph implements State {
 
         @Override
         public CreatingExecutionGraph getState() {
-            return new CreatingExecutionGraph(context, executionGraphFuture, 
log);
+            return new CreatingExecutionGraph(context, 
executionGraphWithParallelismFuture, log);
+        }
+    }
+
+    static class ExecutionGraphWithVertexParallelism {
+        private final ExecutionGraph executionGraph;
+
+        private final VertexParallelism vertexParallelism;
+
+        private ExecutionGraphWithVertexParallelism(
+                ExecutionGraph executionGraph, VertexParallelism 
vertexParallelism) {
+            this.executionGraph = executionGraph;
+            this.vertexParallelism = vertexParallelism;
+        }
+
+        public static ExecutionGraphWithVertexParallelism create(
+                ExecutionGraph executionGraph, VertexParallelism 
vertexParallelism) {
+            return new ExecutionGraphWithVertexParallelism(executionGraph, 
vertexParallelism);
+        }
+
+        public ExecutionGraph getExecutionGraph() {
+            return executionGraph;
+        }
+
+        public VertexParallelism getVertexParallelism() {
+            return vertexParallelism;
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index 9ab0456..74d4afc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.util.concurrent.ScheduledFuture;
 
 /** State which represents a running job with an {@link ExecutionGraph} and 
assigned slots. */
 class Executing extends StateWithExecutionGraph implements ResourceConsumer {
@@ -55,6 +56,9 @@ class Executing extends StateWithExecutionGraph implements 
ResourceConsumer {
         this.userCodeClassLoader = userCodeClassLoader;
 
         deploy();
+
+        // check if new resources have come available in the meantime
+        context.runIfState(this, this::notifyNewResourcesAvailable, 
Duration.ZERO);
     }
 
     @Override
@@ -206,6 +210,17 @@ class Executing extends StateWithExecutionGraph implements 
ResourceConsumer {
                 ExecutionGraphHandler executionGraphHandler,
                 OperatorCoordinatorHandler operatorCoordinatorHandler,
                 Throwable failureCause);
+
+        /**
+         * Runs the given action after a delay if the state at this time 
equals the expected state.
+         *
+         * @param expectedState expectedState describes the required state at 
the time of running
+         *     the action
+         * @param action action to run if the expected state equals the actual 
state
+         * @param delay delay after which to run the action
+         * @return a ScheduledFuture representing pending completion of the 
task
+         */
+        ScheduledFuture<?> runIfState(State expectedState, Runnable action, 
Duration delay);
     }
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ParallelismAndResourceAssignments.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ParallelismAndResourceAssignments.java
deleted file mode 100644
index febf3aa..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ParallelismAndResourceAssignments.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.adaptive;
-
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Map;
-
-/** Assignment of slots to execution vertices. */
-public final class ParallelismAndResourceAssignments {
-    private final ReservedSlots reservedSlots;
-
-    private final Map<JobVertexID, Integer> parallelismPerJobVertex;
-
-    public ParallelismAndResourceAssignments(
-            ReservedSlots reservedSlots, Map<JobVertexID, Integer> 
parallelismPerJobVertex) {
-        this.reservedSlots = reservedSlots;
-        this.parallelismPerJobVertex = parallelismPerJobVertex;
-    }
-
-    public int getParallelism(JobVertexID jobVertexId) {
-        
Preconditions.checkState(parallelismPerJobVertex.containsKey(jobVertexId));
-        return parallelismPerJobVertex.get(jobVertexId);
-    }
-
-    public LogicalSlot getAssignedSlot(ExecutionVertexID executionVertexId) {
-        return reservedSlots.getSlotFor(executionVertexId);
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java
index d4be287..0a19a32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java
@@ -31,4 +31,6 @@ import java.util.Map;
  */
 public interface VertexParallelism {
     Map<JobVertexID, Integer> getMaxParallelismForVertices();
+
+    int getParallelism(JobVertexID jobVertexId);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelismWithSlotSharing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelismWithSlotSharing.java
index 8ed9ace..05bcadf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelismWithSlotSharing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelismWithSlotSharing.java
@@ -44,4 +44,9 @@ public class VertexParallelismWithSlotSharing implements 
VertexParallelism {
     public Map<JobVertexID, Integer> getMaxParallelismForVertices() {
         return vertexParallelism;
     }
+
+    @Override
+    public int getParallelism(JobVertexID jobVertexId) {
+        return vertexParallelism.get(jobVertexId);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
index 8bf8c6a..f83e7cd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
@@ -38,7 +38,7 @@ public final class SlotPoolTestUtils {
         throw new UnsupportedOperationException("This class should never be 
instantiated.");
     }
 
-    static TaskManagerGateway createTaskManagerGateway(
+    public static TaskManagerGateway createTaskManagerGateway(
             @Nullable TaskExecutorGateway taskExecutorGateway) {
         return new RpcTaskManagerGateway(
                 taskExecutorGateway == null
@@ -50,7 +50,15 @@ public final class SlotPoolTestUtils {
     @Nonnull
     public static Collection<SlotOffer> offerSlots(
             DeclarativeSlotPool slotPool, Collection<? extends SlotOffer> 
slotOffers) {
+        return offerSlots(slotPool, slotOffers, 
createTaskManagerGateway(null));
+    }
+
+    @Nonnull
+    public static Collection<SlotOffer> offerSlots(
+            DeclarativeSlotPool slotPool,
+            Collection<? extends SlotOffer> slotOffers,
+            TaskManagerGateway taskManagerGateway) {
         return slotPool.offerSlots(
-                slotOffers, new LocalTaskManagerLocation(), 
createTaskManagerGateway(null), 0);
+                slotOffers, new LocalTaskManagerLocation(), 
taskManagerGateway, 0);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 8a433a4..c4ba684 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -34,6 +35,7 @@ import 
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
@@ -42,7 +44,6 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
@@ -58,9 +59,12 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.TestUtils;
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
@@ -70,15 +74,18 @@ import 
org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.scheduler.GloballyTerminalJobStatusListener;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
 import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionUtils;
 
 import org.hamcrest.MatcherAssert;
 import org.hamcrest.Matchers;
@@ -93,6 +100,9 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -109,6 +119,7 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 
 /** Tests for the {@link AdaptiveScheduler}. */
@@ -119,6 +130,10 @@ public class AdaptiveSchedulerTest extends TestLogger {
 
     @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
 
+    @ClassRule
+    public static final TestExecutorResource<ScheduledExecutorService> 
TEST_EXECUTOR_RESOURCE =
+            new 
TestExecutorResource<>(Executors::newSingleThreadScheduledExecutor);
+
     static {
         JOB_VERTEX = new JobVertex("v1");
         JOB_VERTEX.setParallelism(PARALLELISM);
@@ -128,6 +143,10 @@ public class AdaptiveSchedulerTest extends TestLogger {
     private final ManuallyTriggeredComponentMainThreadExecutor 
mainThreadExecutor =
             new 
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
 
+    private final ComponentMainThreadExecutor singleThreadMainThreadExecutor =
+            ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                    TEST_EXECUTOR_RESOURCE.getExecutor());
+
     @Test
     public void testInitialState() throws Exception {
         final AdaptiveScheduler scheduler =
@@ -236,22 +255,39 @@ public class AdaptiveSchedulerTest extends TestLogger {
         final DefaultDeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
         final AdaptiveScheduler scheduler =
-                new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor)
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
                         .setDeclarativeSlotPool(declarativeSlotPool)
+                        .setJobMasterConfiguration(configuration)
                         .build();
 
-        scheduler.startScheduling();
-
         final int numAvailableSlots = 1;
 
-        offerSlots(
-                declarativeSlotPool,
-                createSlotOffersForResourceRequirements(
-                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
numAvailableSlots)));
+        final OneShotLatch submitTaskLatch = new OneShotLatch();
+        final TaskManagerGateway taskManagerGateway =
+                
createWaitingForTaskSubmissionTaskManagerGateway(submitTaskLatch);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numAvailableSlots)),
+                            taskManagerGateway);
+                });
 
-        final ExecutionGraph executionGraph =
-                scheduler.createExecutionGraphWithAvailableResources();
+        submitTaskLatch.await();
+
+        final ArchivedExecutionGraph executionGraph =
+                CompletableFuture.supplyAsync(
+                                () -> 
scheduler.requestJob().getArchivedExecutionGraph(),
+                                singleThreadMainThreadExecutor)
+                        .join();
 
         assertThat(
                 
executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(),
@@ -267,20 +303,33 @@ public class AdaptiveSchedulerTest extends TestLogger {
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
         final AdaptiveScheduler adaptiveScheduler =
-                new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor)
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
                         .setInitializationTimestamp(initializationTimestamp)
                         .setDeclarativeSlotPool(declarativeSlotPool)
                         .build();
 
-        adaptiveScheduler.startScheduling();
+        final OneShotLatch submitTaskLatch = new OneShotLatch();
+        final TaskManagerGateway taskManagerGateway =
+                
createWaitingForTaskSubmissionTaskManagerGateway(submitTaskLatch);
 
-        offerSlots(
-                declarativeSlotPool,
-                createSlotOffersForResourceRequirements(
-                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
1)));
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    adaptiveScheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
PARALLELISM)),
+                            taskManagerGateway);
+                });
 
-        final ExecutionGraph executionGraph =
-                adaptiveScheduler.createExecutionGraphWithAvailableResources();
+        submitTaskLatch.await();
+
+        final ArchivedExecutionGraph executionGraph =
+                CompletableFuture.supplyAsync(
+                                () -> 
adaptiveScheduler.requestJob().getArchivedExecutionGraph(),
+                                singleThreadMainThreadExecutor)
+                        .join();
 
         assertThat(
                 executionGraph.getStatusTimestamp(JobStatus.INITIALIZING),
@@ -380,9 +429,10 @@ public class AdaptiveSchedulerTest extends TestLogger {
 
         final Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
 
         final AdaptiveScheduler scheduler =
-                new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor)
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
                         .setJobMasterConfiguration(configuration)
                         .setJobManagerJobMetricGroup(
                                 new JobManagerJobMetricGroup(
@@ -395,39 +445,52 @@ public class AdaptiveSchedulerTest extends TestLogger {
 
         final Gauge<Integer> numRestartsMetric = numRestartsMetricFuture.get();
 
-        scheduler.startScheduling();
-
         final SimpleAckingTaskManagerGateway taskManagerGateway =
                 new SimpleAckingTaskManagerGateway();
+        final BlockingQueue<AllocationID> submittedTasks = new 
ArrayBlockingQueue<>(5);
+        taskManagerGateway.setSubmitConsumer(
+                taskDeploymentDescriptor ->
+                        
submittedTasks.offer(taskDeploymentDescriptor.getAllocationId()));
+
         taskManagerGateway.setCancelConsumer(
                 executionAttemptId ->
-                        mainThreadExecutor.execute(
+                        singleThreadMainThreadExecutor.execute(
                                 () ->
                                         scheduler.updateTaskExecutionState(
                                                 new TaskExecutionState(
                                                         executionAttemptId,
                                                         
ExecutionState.CANCELED))));
 
-        declarativeSlotPool.offerSlots(
-                createSlotOffersForResourceRequirements(
-                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
1)),
-                new LocalTaskManagerLocation(),
-                taskManagerGateway,
-                System.currentTimeMillis());
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+
+                    declarativeSlotPool.offerSlots(
+                            createSlotOffersForResourceRequirements(
+                                    
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                            new LocalTaskManagerLocation(),
+                            taskManagerGateway,
+                            System.currentTimeMillis());
+                });
 
-        // trigger resource timeout and the deployment of the job
-        mainThreadExecutor.triggerAllNonPeriodicTasks();
+        // wait for the first task submission
+        submittedTasks.take();
 
         assertThat(numRestartsMetric.getValue(), is(0));
 
-        // offer more slots, which will cause a restart in order to scale up
-        offerSlots(
-                declarativeSlotPool,
-                createSlotOffersForResourceRequirements(
-                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
PARALLELISM)));
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    // offer more slots, which will cause a restart in order 
to scale up
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
PARALLELISM)),
+                            taskManagerGateway);
+                });
 
-        // trigger cancellation of tasks and the restart
-        mainThreadExecutor.triggerAllNonPeriodicTasks();
+        // wait for the second task submission
+        submittedTasks.take();
 
         assertThat(numRestartsMetric.getValue(), is(1));
     }
@@ -731,47 +794,34 @@ public class AdaptiveSchedulerTest extends TestLogger {
         final GloballyTerminalJobStatusListener jobStatusListener =
                 new GloballyTerminalJobStatusListener();
 
-        final ScheduledExecutorService singleThreadExecutor =
-                Executors.newSingleThreadScheduledExecutor();
-
-        try {
-            final ComponentMainThreadExecutor singleMainThreadExecutor =
-                    
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
-                            singleThreadExecutor);
-            final AdaptiveScheduler adaptiveScheduler =
-                    new AdaptiveSchedulerBuilder(jobGraphWithNewOperator, 
singleMainThreadExecutor)
-                            .setDeclarativeSlotPool(declarativeSlotPool)
-                            .setJobStatusListener(jobStatusListener)
-                            .build();
-
-            adaptiveScheduler.startScheduling();
-
-            singleMainThreadExecutor.execute(
-                    () ->
-                            offerSlots(
-                                    declarativeSlotPool,
-                                    createSlotOffersForResourceRequirements(
-                                            ResourceCounter.withResource(
-                                                    ResourceProfile.UNKNOWN, 
1))));
-
-            assertThat(jobStatusListener.getTerminationFuture().join(), 
is(JobStatus.FAILED));
-
-            final ArchivedExecutionGraph archivedExecutionGraph =
-                    CompletableFuture.supplyAsync(
-                                    () ->
-                                            adaptiveScheduler
-                                                    .requestJob()
-                                                    
.getArchivedExecutionGraph(),
-                                    singleMainThreadExecutor)
-                            .join();
-
-            assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
-            assertThat(
-                    archivedExecutionGraph.getFailureInfo().getException(),
-                    FlinkMatchers.containsMessage("Failed to rollback to 
checkpoint/savepoint"));
-        } finally {
-            ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, 
singleThreadExecutor);
-        }
+        final AdaptiveScheduler adaptiveScheduler =
+                new AdaptiveSchedulerBuilder(
+                                jobGraphWithNewOperator, 
singleThreadMainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .setJobStatusListener(jobStatusListener)
+                        .build();
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    adaptiveScheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)));
+                });
+
+        assertThat(jobStatusListener.getTerminationFuture().join(), 
is(JobStatus.FAILED));
+
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                CompletableFuture.supplyAsync(
+                                () -> 
adaptiveScheduler.requestJob().getArchivedExecutionGraph(),
+                                singleThreadMainThreadExecutor)
+                        .join();
+
+        assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED));
+        assertThat(
+                archivedExecutionGraph.getFailureInfo().getException(),
+                FlinkMatchers.containsMessage("Failed to rollback to 
checkpoint/savepoint"));
     }
 
     @Test
@@ -807,26 +857,81 @@ public class AdaptiveSchedulerTest extends TestLogger {
                 createDeclarativeSlotPool(jobGraphWithNewOperator.getJobID());
 
         AdaptiveScheduler adaptiveScheduler =
-                new AdaptiveSchedulerBuilder(jobGraphWithNewOperator, 
mainThreadExecutor)
+                new AdaptiveSchedulerBuilder(
+                                jobGraphWithNewOperator, 
singleThreadMainThreadExecutor)
                         
.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory)
                         .setDeclarativeSlotPool(declarativeSlotPool)
                         .build();
 
-        adaptiveScheduler.startScheduling();
+        final OneShotLatch submitTaskLatch = new OneShotLatch();
+        final TaskManagerGateway taskManagerGateway =
+                
createWaitingForTaskSubmissionTaskManagerGateway(submitTaskLatch);
 
-        offerSlots(
-                declarativeSlotPool,
-                createSlotOffersForResourceRequirements(
-                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
1)));
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    adaptiveScheduler.startScheduling();
+
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                            taskManagerGateway);
+                });
+
+        submitTaskLatch.await();
 
         // starting and offering the required slots should trigger the 
ExecutionGraph creation
-        final CompletedCheckpoint savepoint = 
completedCheckpointStore.getLatestCheckpoint(false);
+        final CompletedCheckpoint savepoint =
+                CompletableFuture.supplyAsync(
+                                FunctionUtils.uncheckedSupplier(
+                                        () -> 
completedCheckpointStore.getLatestCheckpoint(false)),
+                                singleThreadMainThreadExecutor)
+                        .join();
 
         MatcherAssert.assertThat(savepoint, notNullValue());
 
         MatcherAssert.assertThat(savepoint.getCheckpointID(), 
Matchers.is(savepointId));
     }
 
+    @Nonnull
+    private TaskManagerGateway 
createWaitingForTaskSubmissionTaskManagerGateway(
+            OneShotLatch submitTaskLatch) {
+        final TaskManagerGateway taskManagerGateway =
+                SlotPoolTestUtils.createTaskManagerGateway(
+                        new TestingTaskExecutorGatewayBuilder()
+                                .setSubmitTaskConsumer(
+                                        (taskDeploymentDescriptor, 
jobMasterId) -> {
+                                            submitTaskLatch.trigger();
+                                            return 
CompletableFuture.completedFuture(
+                                                    Acknowledge.get());
+                                        })
+                                .createTestingTaskExecutorGateway());
+        return taskManagerGateway;
+    }
+
+    @Test
+    public void 
testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvailable()
+            throws Exception {
+
+        final TestingSlotAllocator slotAllocator =
+                TestingSlotAllocator.newBuilder()
+                        .setTryReserveResourcesFunction(ignored -> 
Optional.empty())
+                        .build();
+
+        final AdaptiveScheduler adaptiveScheduler =
+                new AdaptiveSchedulerBuilder(createJobGraph(), 
mainThreadExecutor)
+                        .setSlotAllocator(slotAllocator)
+                        .build();
+
+        final CreatingExecutionGraph.AssignmentResult assignmentResult =
+                adaptiveScheduler.tryToAssignSlots(
+                        
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+                                new StateTrackingMockExecutionGraph(),
+                                new 
CreatingExecutionGraphTest.TestingVertexParallelism()));
+
+        assertFalse(assignmentResult.isSuccess());
+    }
+
     // 
---------------------------------------------------------------------------------------------
     // Utils
     // 
---------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
index a352734..12d56da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.testutils.CompletedScheduledFuture;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -31,6 +33,7 @@ import org.junit.Test;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.function.Consumer;
@@ -88,52 +91,61 @@ public class CreatingExecutionGraphTest extends TestLogger {
     @Test
     public void testFailedExecutionGraphCreationTransitionsToFinished() throws 
Exception {
         try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
-            final CompletableFuture<ExecutionGraph> executionGraphFuture =
-                    new CompletableFuture<>();
+            final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                    executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
             final CreatingExecutionGraph creatingExecutionGraph =
-                    new CreatingExecutionGraph(context, executionGraphFuture, 
log);
+                    new CreatingExecutionGraph(
+                            context, 
executionGraphWithVertexParallelismFuture, log);
 
             context.setExpectFinished(
                     archivedExecutionGraph ->
                             assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED)));
 
-            executionGraphFuture.completeExceptionally(new 
FlinkException("Test exception"));
+            executionGraphWithVertexParallelismFuture.completeExceptionally(
+                    new FlinkException("Test exception"));
         }
     }
 
     @Test
     public void 
testNotPossibleSlotAssignmentTransitionsToWaitingForResources() throws 
Exception {
         try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
-            final CompletableFuture<ExecutionGraph> executionGraphFuture =
-                    new CompletableFuture<>();
+            final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                    executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
             final CreatingExecutionGraph creatingExecutionGraph =
-                    new CreatingExecutionGraph(context, executionGraphFuture, 
log);
+                    new CreatingExecutionGraph(
+                            context, 
executionGraphWithVertexParallelismFuture, log);
 
             context.setTryToAssignSlotsFunction(
                     ignored -> 
CreatingExecutionGraph.AssignmentResult.notPossible());
             context.setExpectWaitingForResources();
 
-            executionGraphFuture.complete(new 
StateTrackingMockExecutionGraph());
+            executionGraphWithVertexParallelismFuture.complete(
+                    
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+                            new StateTrackingMockExecutionGraph(), new 
TestingVertexParallelism()));
         }
     }
 
     @Test
     public void testSuccessfulSlotAssignmentTransitionsToExecuting() throws 
Exception {
         try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
-            final CompletableFuture<ExecutionGraph> executionGraphFuture =
-                    new CompletableFuture<>();
+            final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                    executionGraphWithvertexParallelismFuture = new 
CompletableFuture<>();
             final CreatingExecutionGraph creatingExecutionGraph =
-                    new CreatingExecutionGraph(context, executionGraphFuture, 
log);
+                    new CreatingExecutionGraph(
+                            context, 
executionGraphWithvertexParallelismFuture, log);
 
             final StateTrackingMockExecutionGraph executionGraph =
                     new StateTrackingMockExecutionGraph();
 
-            
context.setTryToAssignSlotsFunction(CreatingExecutionGraph.AssignmentResult::success);
+            context.setTryToAssignSlotsFunction(
+                    e -> 
CreatingExecutionGraph.AssignmentResult.success(e.getExecutionGraph()));
             context.setExpectedExecuting(
                     actualExecutionGraph ->
                             assertThat(actualExecutionGraph, 
sameInstance(executionGraph)));
 
-            executionGraphFuture.complete(executionGraph);
+            executionGraphWithvertexParallelismFuture.complete(
+                    
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+                            executionGraph, new TestingVertexParallelism()));
         }
     }
 
@@ -146,8 +158,11 @@ public class CreatingExecutionGraphTest extends TestLogger 
{
         private final StateValidator<ExecutionGraph> executingStateValidator =
                 new StateValidator<>("Executing");
 
-        private Function<ExecutionGraph, 
CreatingExecutionGraph.AssignmentResult>
-                tryToAssignSlotsFunction = 
CreatingExecutionGraph.AssignmentResult::success;
+        private Function<
+                        
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism,
+                        CreatingExecutionGraph.AssignmentResult>
+                tryToAssignSlotsFunction =
+                        e -> 
CreatingExecutionGraph.AssignmentResult.success(e.getExecutionGraph());
 
         private boolean hadStateTransitionHappened = false;
 
@@ -164,7 +179,9 @@ public class CreatingExecutionGraphTest extends TestLogger {
         }
 
         public void setTryToAssignSlotsFunction(
-                Function<ExecutionGraph, 
CreatingExecutionGraph.AssignmentResult>
+                Function<
+                                
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism,
+                                CreatingExecutionGraph.AssignmentResult>
                         tryToAssignSlotsFunction) {
             this.tryToAssignSlotsFunction = tryToAssignSlotsFunction;
         }
@@ -199,8 +216,9 @@ public class CreatingExecutionGraphTest extends TestLogger {
 
         @Override
         public CreatingExecutionGraph.AssignmentResult tryToAssignSlots(
-                ExecutionGraph executionGraph) {
-            return tryToAssignSlotsFunction.apply(executionGraph);
+                CreatingExecutionGraph.ExecutionGraphWithVertexParallelism
+                        executionGraphWithVertexParallelism) {
+            return 
tryToAssignSlotsFunction.apply(executionGraphWithVertexParallelism);
         }
 
         @Override
@@ -216,4 +234,17 @@ public class CreatingExecutionGraphTest extends TestLogger 
{
             executingStateValidator.close();
         }
     }
+
+    static final class TestingVertexParallelism implements VertexParallelism {
+
+        @Override
+        public Map<JobVertexID, Integer> getMaxParallelismForVertices() {
+            throw new UnsupportedOperationException("Is not supported");
+        }
+
+        @Override
+        public int getParallelism(JobVertexID jobVertexId) {
+            throw new UnsupportedOperationException("Is not supported");
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index 76f04f0..f8a1876 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CompletedScheduledFuture;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
@@ -65,6 +66,7 @@ import java.time.Duration;
 import java.util.Collections;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledFuture;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -289,6 +291,20 @@ public class ExecutingTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testExecutingChecksForNewResourcesWhenBeingCreated() throws 
Exception {
+        try (MockExecutingContext context = new MockExecutingContext()) {
+            context.setCanScaleUp(() -> true);
+            context.setExpectRestarting(
+                    restartingArguments -> {
+                        // expect immediate restart on scale up
+                        assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                    });
+
+            final Executing executing = new 
ExecutingStateBuilder().build(context);
+        }
+    }
+
     private final class ExecutingStateBuilder {
         private ExecutionGraph executionGraph =
                 TestingDefaultExecutionGraphBuilder.newBuilder().build();
@@ -344,7 +360,7 @@ public class ExecutingTest extends TestLogger {
                 new StateValidator<>("cancelling");
 
         private Function<Throwable, Executing.FailureResult> 
howToHandleFailure;
-        private Supplier<Boolean> canScaleUp;
+        private Supplier<Boolean> canScaleUp = () -> false;
 
         public void setExpectFailing(Consumer<FailingArguments> asserter) {
             failingStateValidator.expectInput(asserter);
@@ -420,6 +436,15 @@ public class ExecutingTest extends TestLogger {
         }
 
         @Override
+        public ScheduledFuture<?> runIfState(State expectedState, Runnable 
action, Duration delay) {
+            if (!hadStateTransition) {
+                action.run();
+            }
+
+            return CompletedScheduledFuture.create(null);
+        }
+
+        @Override
         public void close() throws Exception {
             super.close();
             failingStateValidator.close();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
index fa7eaa3..ca28a16 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
@@ -226,12 +226,14 @@ class StateTrackingMockExecutionGraph implements 
ExecutionGraph {
         return new StringifiedAccumulatorResult[0];
     }
 
-    // -- remaining interface implementations: all unsupported
+    @Override
+    public void start(@Nonnull ComponentMainThreadExecutor 
jobMasterMainThreadExecutor) {}
 
     @Override
-    public void start(@Nonnull ComponentMainThreadExecutor 
jobMasterMainThreadExecutor) {
-        throw new UnsupportedOperationException();
-    }
+    public void setInternalTaskFailuresListener(
+            InternalFailuresListener internalTaskFailuresListener) {}
+
+    // -- remaining interface implementations: all unsupported
 
     @Override
     public SchedulingTopology getSchedulingTopology() {
@@ -303,12 +305,6 @@ class StateTrackingMockExecutionGraph implements 
ExecutionGraph {
     }
 
     @Override
-    public void setInternalTaskFailuresListener(
-            InternalFailuresListener internalTaskFailuresListener) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public void attachJobGraph(List<JobVertex> topologiallySorted) throws 
JobException {
         throw new UnsupportedOperationException();
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
new file mode 100644
index 0000000..aa6fd7b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.util.ResourceCounter;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/** Testing implementation of {@link SlotAllocator}. */
+public class TestingSlotAllocator implements SlotAllocator {
+
+    private final Function<Iterable<JobInformation.VertexInformation>, 
ResourceCounter>
+            calculateRequiredSlotsFunction;
+
+    private final BiFunction<
+                    JobInformation,
+                    Collection<? extends SlotInfo>,
+                    Optional<? extends VertexParallelism>>
+            determineParallelismFunction;
+
+    private final Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction;
+
+    private TestingSlotAllocator(
+            Function<Iterable<JobInformation.VertexInformation>, 
ResourceCounter>
+                    calculateRequiredSlotsFunction,
+            BiFunction<
+                            JobInformation,
+                            Collection<? extends SlotInfo>,
+                            Optional<? extends VertexParallelism>>
+                    determineParallelismFunction,
+            Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction) {
+        this.calculateRequiredSlotsFunction = calculateRequiredSlotsFunction;
+        this.determineParallelismFunction = determineParallelismFunction;
+        this.tryReserveResourcesFunction = tryReserveResourcesFunction;
+    }
+
+    @Override
+    public ResourceCounter calculateRequiredSlots(
+            Iterable<JobInformation.VertexInformation> vertices) {
+        return calculateRequiredSlotsFunction.apply(vertices);
+    }
+
+    @Override
+    public Optional<? extends VertexParallelism> determineParallelism(
+            JobInformation jobInformation, Collection<? extends SlotInfo> 
slots) {
+        return determineParallelismFunction.apply(jobInformation, slots);
+    }
+
+    @Override
+    public Optional<ReservedSlots> tryReserveResources(VertexParallelism 
vertexParallelism) {
+        return tryReserveResourcesFunction.apply(vertexParallelism);
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /** Builder for the {@link TestingSlotAllocator}. */
+    public static final class Builder {
+        private Function<Iterable<JobInformation.VertexInformation>, 
ResourceCounter>
+                calculateRequiredSlotsFunction = ignored -> 
ResourceCounter.empty();
+        private BiFunction<
+                        JobInformation,
+                        Collection<? extends SlotInfo>,
+                        Optional<? extends VertexParallelism>>
+                determineParallelismFunction = (ignoredA, ignoredB) -> 
Optional.empty();
+        private Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction =
+                ignored -> Optional.empty();
+
+        public Builder setCalculateRequiredSlotsFunction(
+                Function<Iterable<JobInformation.VertexInformation>, 
ResourceCounter>
+                        calculateRequiredSlotsFunction) {
+            this.calculateRequiredSlotsFunction = 
calculateRequiredSlotsFunction;
+            return this;
+        }
+
+        public Builder setDetermineParallelismFunction(
+                BiFunction<
+                                JobInformation,
+                                Collection<? extends SlotInfo>,
+                                Optional<? extends VertexParallelism>>
+                        determineParallelismFunction) {
+            this.determineParallelismFunction = determineParallelismFunction;
+            return this;
+        }
+
+        public Builder setTryReserveResourcesFunction(
+                Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction) {
+            this.tryReserveResourcesFunction = tryReserveResourcesFunction;
+            return this;
+        }
+
+        public TestingSlotAllocator build() {
+            return new TestingSlotAllocator(
+                    calculateRequiredSlotsFunction,
+                    determineParallelismFunction,
+                    tryReserveResourcesFunction);
+        }
+    }
+}

Reply via email to