This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bc0f2c698b68960d227d5c7dc8a06a52b7138888
Author: Till Rohrmann <[email protected]>
AuthorDate: Sun Mar 14 00:12:38 2021 +0100

    [FLINK-21602] Add CreatingExecutionGraph state
    
    The CreatingExecutionGraph state models the asynchronous ExecutionGraph 
creation
    after enough resources have been acquired in the WaitingForResources state.
---
 .../scheduler/adaptive/AdaptiveScheduler.java      | 231 +++++++++++----------
 .../scheduler/adaptive/CreatingExecutionGraph.java | 231 +++++++++++++++++++++
 .../scheduler/adaptive/WaitingForResources.java    |  31 +--
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |   2 +-
 .../adaptive/CreatingExecutionGraphTest.java       | 215 +++++++++++++++++++
 .../adaptive/WaitingForResourcesTest.java          |  62 +-----
 6 files changed, 585 insertions(+), 187 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 0998faf..1c5763e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -149,6 +149,7 @@ public class AdaptiveScheduler
         implements SchedulerNG,
                 Created.Context,
                 WaitingForResources.Context,
+                CreatingExecutionGraph.Context,
                 Executing.Context,
                 Restarting.Context,
                 Failing.Context,
@@ -599,112 +600,6 @@ public class AdaptiveScheduler
     }
 
     @Override
-    public ExecutionGraph createExecutionGraphWithAvailableResources() throws 
Exception {
-        final ParallelismAndResourceAssignments 
parallelismAndResourceAssignments =
-                determineParallelismAndAssignResources(slotAllocator);
-
-        JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
-        for (JobVertex vertex : adjustedJobGraph.getVertices()) {
-            
vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID()));
-        }
-
-        final ExecutionGraph executionGraph = 
createExecutionGraphAndRestoreState(adjustedJobGraph);
-
-        executionGraph.start(componentMainThreadExecutor);
-        executionGraph.transitionToRunning();
-
-        executionGraph.setInternalTaskFailuresListener(
-                new UpdateSchedulerNgOnInternalFailuresListener(this));
-
-        for (ExecutionVertex executionVertex : 
executionGraph.getAllExecutionVertices()) {
-            final LogicalSlot assignedSlot =
-                    
parallelismAndResourceAssignments.getAssignedSlot(executionVertex.getID());
-            executionVertex
-                    .getCurrentExecutionAttempt()
-                    
.registerProducedPartitions(assignedSlot.getTaskManagerLocation(), false);
-            executionVertex.tryAssignResource(assignedSlot);
-        }
-        return executionGraph;
-    }
-
-    private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph 
adjustedJobGraph)
-            throws Exception {
-        ExecutionDeploymentListener executionDeploymentListener =
-                new 
ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
-        ExecutionStateUpdateListener executionStateUpdateListener =
-                (execution, newState) -> {
-                    if (newState.isTerminal()) {
-                        
executionDeploymentTracker.stopTrackingDeploymentOf(execution);
-                    }
-                };
-
-        final ExecutionGraph newExecutionGraph =
-                DefaultExecutionGraphBuilder.buildGraph(
-                        adjustedJobGraph,
-                        configuration,
-                        futureExecutor,
-                        ioExecutor,
-                        userCodeClassLoader,
-                        completedCheckpointStore,
-                        checkpointsCleaner,
-                        checkpointIdCounter,
-                        rpcTimeout,
-                        jobManagerJobMetricGroup,
-                        blobWriter,
-                        LOG,
-                        shuffleMaster,
-                        partitionTracker,
-                        
TaskDeploymentDescriptorFactory.PartitionLocationConstraint
-                                .MUST_BE_KNOWN, // AdaptiveScheduler only 
supports streaming jobs
-                        executionDeploymentListener,
-                        executionStateUpdateListener,
-                        initializationTimestamp,
-                        vertexAttemptNumberStore);
-
-        final CheckpointCoordinator checkpointCoordinator =
-                newExecutionGraph.getCheckpointCoordinator();
-
-        if (checkpointCoordinator != null) {
-            // check whether we find a valid checkpoint
-            if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
-                    new 
HashSet<>(newExecutionGraph.getAllVertices().values()))) {
-
-                // check whether we can restore from a savepoint
-                tryRestoreExecutionGraphFromSavepoint(
-                        newExecutionGraph, 
adjustedJobGraph.getSavepointRestoreSettings());
-            }
-        }
-
-        return newExecutionGraph;
-    }
-
-    /**
-     * Tries to restore the given {@link ExecutionGraph} from the provided 
{@link
-     * SavepointRestoreSettings}, iff checkpointing is enabled.
-     *
-     * @param executionGraphToRestore {@link ExecutionGraph} which is supposed 
to be restored
-     * @param savepointRestoreSettings {@link SavepointRestoreSettings} 
containing information about
-     *     the savepoint to restore from
-     * @throws Exception if the {@link ExecutionGraph} could not be restored
-     */
-    private void tryRestoreExecutionGraphFromSavepoint(
-            ExecutionGraph executionGraphToRestore,
-            SavepointRestoreSettings savepointRestoreSettings)
-            throws Exception {
-        if (savepointRestoreSettings.restoreSavepoint()) {
-            final CheckpointCoordinator checkpointCoordinator =
-                    executionGraphToRestore.getCheckpointCoordinator();
-            if (checkpointCoordinator != null) {
-                checkpointCoordinator.restoreSavepoint(
-                        savepointRestoreSettings.getRestorePath(),
-                        savepointRestoreSettings.allowNonRestoredState(),
-                        executionGraphToRestore.getAllVertices(),
-                        userCodeClassLoader);
-            }
-        }
-    }
-
-    @Override
     public ArchivedExecutionGraph getArchivedExecutionGraph(
             JobStatus jobStatus, @Nullable Throwable cause) {
         return ArchivedExecutionGraph.createFromInitializingJob(
@@ -813,6 +708,125 @@ public class AdaptiveScheduler
     }
 
     @Override
+    public void goToCreatingExecutionGraph() {
+        CompletableFuture<ExecutionGraph> executionGraphFuture;
+
+        try {
+            final ExecutionGraph executionGraph = 
createExecutionGraphWithAvailableResources();
+            executionGraphFuture = 
CompletableFuture.completedFuture(executionGraph);
+        } catch (Exception exception) {
+            executionGraphFuture = 
FutureUtils.completedExceptionally(exception);
+        }
+
+        transitionToState(new CreatingExecutionGraph.Factory(this, 
executionGraphFuture, LOG));
+    }
+
+    ExecutionGraph createExecutionGraphWithAvailableResources() throws 
Exception {
+        final ParallelismAndResourceAssignments 
parallelismAndResourceAssignments =
+                determineParallelismAndAssignResources(slotAllocator);
+
+        JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
+        for (JobVertex vertex : adjustedJobGraph.getVertices()) {
+            
vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID()));
+        }
+
+        final ExecutionGraph executionGraph = 
createExecutionGraphAndRestoreState(adjustedJobGraph);
+
+        executionGraph.start(componentMainThreadExecutor);
+        executionGraph.transitionToRunning();
+
+        executionGraph.setInternalTaskFailuresListener(
+                new UpdateSchedulerNgOnInternalFailuresListener(this));
+
+        for (ExecutionVertex executionVertex : 
executionGraph.getAllExecutionVertices()) {
+            final LogicalSlot assignedSlot =
+                    
parallelismAndResourceAssignments.getAssignedSlot(executionVertex.getID());
+            executionVertex
+                    .getCurrentExecutionAttempt()
+                    
.registerProducedPartitions(assignedSlot.getTaskManagerLocation(), false);
+            executionVertex.tryAssignResource(assignedSlot);
+        }
+        return executionGraph;
+    }
+
+    private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph 
adjustedJobGraph)
+            throws Exception {
+        ExecutionDeploymentListener executionDeploymentListener =
+                new 
ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
+        ExecutionStateUpdateListener executionStateUpdateListener =
+                (execution, newState) -> {
+                    if (newState.isTerminal()) {
+                        
executionDeploymentTracker.stopTrackingDeploymentOf(execution);
+                    }
+                };
+
+        final ExecutionGraph newExecutionGraph =
+                DefaultExecutionGraphBuilder.buildGraph(
+                        adjustedJobGraph,
+                        configuration,
+                        futureExecutor,
+                        ioExecutor,
+                        userCodeClassLoader,
+                        completedCheckpointStore,
+                        checkpointsCleaner,
+                        checkpointIdCounter,
+                        rpcTimeout,
+                        jobManagerJobMetricGroup,
+                        blobWriter,
+                        LOG,
+                        shuffleMaster,
+                        partitionTracker,
+                        
TaskDeploymentDescriptorFactory.PartitionLocationConstraint
+                                .MUST_BE_KNOWN, // AdaptiveScheduler only 
supports streaming jobs
+                        executionDeploymentListener,
+                        executionStateUpdateListener,
+                        initializationTimestamp,
+                        vertexAttemptNumberStore);
+
+        final CheckpointCoordinator checkpointCoordinator =
+                newExecutionGraph.getCheckpointCoordinator();
+
+        if (checkpointCoordinator != null) {
+            // check whether we find a valid checkpoint
+            if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
+                    new 
HashSet<>(newExecutionGraph.getAllVertices().values()))) {
+
+                // check whether we can restore from a savepoint
+                tryRestoreExecutionGraphFromSavepoint(
+                        newExecutionGraph, 
adjustedJobGraph.getSavepointRestoreSettings());
+            }
+        }
+
+        return newExecutionGraph;
+    }
+
+    /**
+     * Tries to restore the given {@link ExecutionGraph} from the provided 
{@link
+     * SavepointRestoreSettings}, iff checkpointing is enabled.
+     *
+     * @param executionGraphToRestore {@link ExecutionGraph} which is supposed 
to be restored
+     * @param savepointRestoreSettings {@link SavepointRestoreSettings} 
containing information about
+     *     the savepoint to restore from
+     * @throws Exception if the {@link ExecutionGraph} could not be restored
+     */
+    private void tryRestoreExecutionGraphFromSavepoint(
+            ExecutionGraph executionGraphToRestore,
+            SavepointRestoreSettings savepointRestoreSettings)
+            throws Exception {
+        if (savepointRestoreSettings.restoreSavepoint()) {
+            final CheckpointCoordinator checkpointCoordinator =
+                    executionGraphToRestore.getCheckpointCoordinator();
+            if (checkpointCoordinator != null) {
+                checkpointCoordinator.restoreSavepoint(
+                        savepointRestoreSettings.getRestorePath(),
+                        savepointRestoreSettings.allowNonRestoredState(),
+                        executionGraphToRestore.getAllVertices(),
+                        userCodeClassLoader);
+            }
+        }
+    }
+
+    @Override
     public boolean canScaleUp(ExecutionGraph executionGraph) {
         int availableSlots = 
declarativeSlotPool.getFreeSlotsInformation().size();
 
@@ -936,6 +950,11 @@ public class AdaptiveScheduler
                 () -> runIfState(expectedState, action), delay.toMillis(), 
TimeUnit.MILLISECONDS);
     }
 
+    @Override
+    public CreatingExecutionGraph.AssignmentResult 
tryToAssignSlots(ExecutionGraph executionGraph) {
+        return CreatingExecutionGraph.AssignmentResult.success(executionGraph);
+    }
+
     // ----------------------------------------------------------------
 
     /** Note: Do not call this method from a State constructor or 
State#onLeave. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
new file mode 100644
index 0000000..b6f8df4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * State which waits for the creation of the {@link ExecutionGraph}. If the 
creation fails, then the
+ * state transitions to {@link Finished}. If the creation succeeds, then the 
system tries to assign
+ * the required slots. If the set of available slots has changed so that the 
created {@link
+ * ExecutionGraph} cannot be executed, the state transitions back into {@link 
WaitingForResources}.
+ * If there are enough slots for the {@link ExecutionGraph} to run, the state 
transitions to {@link
+ * Executing}.
+ */
+public class CreatingExecutionGraph implements State {
+
+    private final Context context;
+
+    private final Logger log;
+
+    public CreatingExecutionGraph(
+            Context context, CompletableFuture<ExecutionGraph> 
executionGraphFuture, Logger log) {
+        this.context = context;
+        this.log = log;
+
+        FutureUtils.assertNoException(
+                executionGraphFuture.handle(
+                        (executionGraph, throwable) -> {
+                            context.runIfState(
+                                    this,
+                                    () -> 
handleExecutionGraphCreation(executionGraph, throwable),
+                                    Duration.ZERO);
+                            return null;
+                        }));
+    }
+
+    private void handleExecutionGraphCreation(
+            @Nullable ExecutionGraph executionGraph, @Nullable Throwable 
throwable) {
+        if (throwable != null) {
+            log.info(
+                    "Failed to go from {} to {} because the ExecutionGraph 
creation failed.",
+                    CreatingExecutionGraph.class.getSimpleName(),
+                    Executing.class.getSimpleName(),
+                    throwable);
+            
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, 
throwable));
+        } else {
+            final AssignmentResult result = 
context.tryToAssignSlots(executionGraph);
+
+            if (result.isSuccess()) {
+                context.goToExecuting(result.getExecutionGraph());
+            } else {
+                context.goToWaitingForResources();
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, 
null));
+    }
+
+    @Override
+    public void suspend(Throwable cause) {
+        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, 
cause));
+    }
+
+    @Override
+    public JobStatus getJobStatus() {
+        return JobStatus.INITIALIZING;
+    }
+
+    @Override
+    public ArchivedExecutionGraph getJob() {
+        return context.getArchivedExecutionGraph(getJobStatus(), null);
+    }
+
+    @Override
+    public void handleGlobalFailure(Throwable cause) {
+        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, 
cause));
+    }
+
+    @Override
+    public Logger getLogger() {
+        return log;
+    }
+
+    /** Context for the {@link CreatingExecutionGraph} state. */
+    interface Context {
+
+        /**
+         * Transitions into the {@link Finished} state.
+         *
+         * @param archivedExecutionGraph archivedExecutionGraph representing 
the final job state
+         */
+        void goToFinished(ArchivedExecutionGraph archivedExecutionGraph);
+
+        /**
+         * Transitions into the {@link Executing} state.
+         *
+         * @param executionGraph executionGraph which is passed to the {@link 
Executing} state
+         */
+        void goToExecuting(ExecutionGraph executionGraph);
+
+        /** Transitions into the {@link WaitingForResources} state. */
+        void goToWaitingForResources();
+
+        /**
+         * Creates the {@link ArchivedExecutionGraph} for the given job status 
and cause. Cause can
+         * be null if there is no failure.
+         *
+         * @param jobStatus jobStatus to initialize the {@link 
ArchivedExecutionGraph} with
+         * @param cause cause describing a failure cause; {@code null} if 
there is none
+         * @return the created {@link ArchivedExecutionGraph}
+         */
+        ArchivedExecutionGraph getArchivedExecutionGraph(
+                JobStatus jobStatus, @Nullable Throwable cause);
+
+        /**
+         * Runs the given action after a delay if the state at this time 
equals the expected state.
+         *
+         * @param expectedState expectedState describes the required state at 
the time of running
+         *     the action
+         * @param action action to run if the expected state equals the actual 
state
+         * @param delay delay after which to run the action
+         */
+        void runIfState(State expectedState, Runnable action, Duration delay);
+
+        /**
+         * Try to assign slots to the created {@link ExecutionGraph}. If it is 
possible, then this
+         * method returns a successful {@link AssignmentResult} which contains 
the assigned {@link
+         * ExecutionGraph}. If not, then the assignment result is a failure.
+         *
+         * @param executionGraph executionGraph to assign slots to
+         * @return {@link AssignmentResult} representing the result of the 
assignment
+         */
+        AssignmentResult tryToAssignSlots(ExecutionGraph executionGraph);
+    }
+
+    /**
+     * Class representing the assignment result of the slots to the {@link 
ExecutionGraph}. The
+     * assignment is either successful or not possible. If it is successful, 
the assignment also
+     * contains the assigned {@link ExecutionGraph}.
+     */
+    static final class AssignmentResult {
+
+        private static final AssignmentResult NOT_POSSIBLE = new 
AssignmentResult(null);
+
+        @Nullable private final ExecutionGraph executionGraph;
+
+        private AssignmentResult(@Nullable ExecutionGraph executionGraph) {
+            this.executionGraph = executionGraph;
+        }
+
+        boolean isSuccess() {
+            return executionGraph != null;
+        }
+
+        ExecutionGraph getExecutionGraph() {
+            Preconditions.checkState(
+                    isSuccess(), "Can only return the ExecutionGraph if it is 
a success.");
+            return executionGraph;
+        }
+
+        static AssignmentResult success(ExecutionGraph executionGraph) {
+            return new AssignmentResult(
+                    Preconditions.checkNotNull(
+                            executionGraph,
+                            "AssignmentResult.success expects a non-null 
ExecutionGraph."));
+        }
+
+        static AssignmentResult notPossible() {
+            return NOT_POSSIBLE;
+        }
+    }
+
+    /** Factory for the {@link CreatingExecutionGraph} state. */
+    static class Factory implements StateFactory<CreatingExecutionGraph> {
+
+        private final Context context;
+
+        private final CompletableFuture<ExecutionGraph> executionGraphFuture;
+
+        private final Logger log;
+
+        Factory(
+                Context context,
+                CompletableFuture<ExecutionGraph> executionGraphFuture,
+                Logger log) {
+            this.context = context;
+            this.executionGraphFuture = executionGraphFuture;
+            this.log = log;
+        }
+
+        @Override
+        public Class<CreatingExecutionGraph> getStateClass() {
+            return CreatingExecutionGraph.class;
+        }
+
+        @Override
+        public CreatingExecutionGraph getState() {
+            return new CreatingExecutionGraph(context, executionGraphFuture, 
log);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
index 6f118c0..848d729 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.Preconditions;
 
@@ -101,19 +100,7 @@ class WaitingForResources implements State, 
ResourceConsumer {
     }
 
     private void createExecutionGraphWithAvailableResources() {
-        try {
-            final ExecutionGraph executionGraph =
-                    context.createExecutionGraphWithAvailableResources();
-
-            context.goToExecuting(executionGraph);
-        } catch (Exception exception) {
-            log.info(
-                    "Failed to go from {} to {} because the ExecutionGraph 
creation failed.",
-                    WaitingForResources.class.getSimpleName(),
-                    Executing.class.getSimpleName(),
-                    exception);
-            
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, 
exception));
-        }
+        context.goToCreatingExecutionGraph();
     }
 
     /** Context of the {@link WaitingForResources} state. */
@@ -126,12 +113,8 @@ class WaitingForResources implements State, 
ResourceConsumer {
          */
         void goToFinished(ArchivedExecutionGraph archivedExecutionGraph);
 
-        /**
-         * Transitions into the {@link Executing} state.
-         *
-         * @param executionGraph executionGraph which is passed to the {@link 
Executing} state
-         */
-        void goToExecuting(ExecutionGraph executionGraph);
+        /** Transitions into the {@link CreatingExecutionGraph} state. */
+        void goToCreatingExecutionGraph();
 
         /**
          * Creates the {@link ArchivedExecutionGraph} for the given job status 
and cause. Cause can
@@ -153,14 +136,6 @@ class WaitingForResources implements State, 
ResourceConsumer {
         boolean hasEnoughResources(ResourceCounter desiredResources);
 
         /**
-         * Creates an {@link ExecutionGraph} with the available resources.
-         *
-         * @return the created {@link ExecutionGraph}
-         * @throws Exception if the creation of the {@link ExecutionGraph} 
fails
-         */
-        ExecutionGraph createExecutionGraphWithAvailableResources() throws 
Exception;
-
-        /**
          * Runs the given action after a delay if the state at this time 
equals the expected state.
          *
          * @param expectedState expectedState describes the required state at 
the time of running
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index ec4d4b2..4958a37 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -479,7 +479,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
                 createSlotOffersForResourceRequirements(
                         ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
PARALLELISM)));
 
-        assertThat(scheduler.getState(), instanceOf(Executing.class));
+        assertThat(scheduler.getState(), 
instanceOf(CreatingExecutionGraph.class));
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
new file mode 100644
index 0000000..3d467fb
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link CreatingExecutionGraph} state. */
+public class CreatingExecutionGraphTest extends TestLogger {
+
+    @Test
+    public void testCancelTransitionsToFinished() throws Exception {
+        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
+            final CreatingExecutionGraph creatingExecutionGraph =
+                    new CreatingExecutionGraph(context, new 
CompletableFuture<>(), log);
+
+            context.setExpectFinished(
+                    archivedExecutionGraph ->
+                            assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.CANCELED)));
+
+            creatingExecutionGraph.cancel();
+        }
+    }
+
+    @Test
+    public void testSuspendTransitionsToFinished() throws Exception {
+        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
+            final CreatingExecutionGraph creatingExecutionGraph =
+                    new CreatingExecutionGraph(context, new 
CompletableFuture<>(), log);
+
+            context.setExpectFinished(
+                    archivedExecutionGraph ->
+                            assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.SUSPENDED)));
+
+            creatingExecutionGraph.suspend(new FlinkException("Job has been 
suspended."));
+        }
+    }
+
+    @Test
+    public void testGlobalFailureTransitionsToFinished() throws Exception {
+        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
+            final CreatingExecutionGraph creatingExecutionGraph =
+                    new CreatingExecutionGraph(context, new 
CompletableFuture<>(), log);
+
+            context.setExpectFinished(
+                    archivedExecutionGraph ->
+                            assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED)));
+
+            creatingExecutionGraph.handleGlobalFailure(new 
FlinkException("Test exception"));
+        }
+    }
+
+    @Test
+    public void testFailedExecutionGraphCreationTransitionsToFinished() throws 
Exception {
+        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
+            final CompletableFuture<ExecutionGraph> executionGraphFuture =
+                    new CompletableFuture<>();
+            final CreatingExecutionGraph creatingExecutionGraph =
+                    new CreatingExecutionGraph(context, executionGraphFuture, 
log);
+
+            context.setExpectFinished(
+                    archivedExecutionGraph ->
+                            assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED)));
+
+            executionGraphFuture.completeExceptionally(new 
FlinkException("Test exception"));
+        }
+    }
+
+    @Test
+    public void 
testNotPossibleSlotAssignmentTransitionsToWaitingForResources() throws 
Exception {
+        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
+            final CompletableFuture<ExecutionGraph> executionGraphFuture =
+                    new CompletableFuture<>();
+            final CreatingExecutionGraph creatingExecutionGraph =
+                    new CreatingExecutionGraph(context, executionGraphFuture, 
log);
+
+            context.setTryToAssignSlotsFunction(
+                    ignored -> 
CreatingExecutionGraph.AssignmentResult.notPossible());
+            context.setExpectWaitingForResources();
+
+            executionGraphFuture.complete(new 
StateTrackingMockExecutionGraph());
+        }
+    }
+
+    @Test
+    public void testSuccessfulSlotAssignmentTransitionsToExecuting() throws 
Exception {
+        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
+            final CompletableFuture<ExecutionGraph> executionGraphFuture =
+                    new CompletableFuture<>();
+            final CreatingExecutionGraph creatingExecutionGraph =
+                    new CreatingExecutionGraph(context, executionGraphFuture, 
log);
+
+            final StateTrackingMockExecutionGraph executionGraph =
+                    new StateTrackingMockExecutionGraph();
+
+            
context.setTryToAssignSlotsFunction(CreatingExecutionGraph.AssignmentResult::success);
+            context.setExpectedExecuting(
+                    actualExecutionGraph ->
+                            assertThat(actualExecutionGraph, 
sameInstance(executionGraph)));
+
+            executionGraphFuture.complete(executionGraph);
+        }
+    }
+
+    static class MockCreatingExecutionGraphContext
+            implements CreatingExecutionGraph.Context, AutoCloseable {
+        private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
+                new StateValidator<>("Finished");
+        private final StateValidator<Void> waitingForResourcesStateValidator =
+                new StateValidator<>("WaitingForResources");
+        private final StateValidator<ExecutionGraph> executingStateValidator =
+                new StateValidator<>("Executing");
+
+        private Function<ExecutionGraph, 
CreatingExecutionGraph.AssignmentResult>
+                tryToAssignSlotsFunction = 
CreatingExecutionGraph.AssignmentResult::success;
+
+        private boolean hadStateTransitionHappened = false;
+
+        public void setExpectFinished(Consumer<ArchivedExecutionGraph> 
asserter) {
+            finishedStateValidator.expectInput(asserter);
+        }
+
+        public void setExpectWaitingForResources() {
+            waitingForResourcesStateValidator.expectInput((none) -> {});
+        }
+
+        public void setExpectedExecuting(Consumer<ExecutionGraph> asserter) {
+            executingStateValidator.expectInput(asserter);
+        }
+
+        public void setTryToAssignSlotsFunction(
+                Function<ExecutionGraph, 
CreatingExecutionGraph.AssignmentResult>
+                        tryToAssignSlotsFunction) {
+            this.tryToAssignSlotsFunction = tryToAssignSlotsFunction;
+        }
+
+        @Override
+        public void goToFinished(ArchivedExecutionGraph 
archivedExecutionGraph) {
+            finishedStateValidator.validateInput(archivedExecutionGraph);
+            hadStateTransitionHappened = true;
+        }
+
+        @Override
+        public void goToExecuting(ExecutionGraph executionGraph) {
+            executingStateValidator.validateInput(executionGraph);
+            hadStateTransitionHappened = true;
+        }
+
+        @Override
+        public ArchivedExecutionGraph getArchivedExecutionGraph(
+                JobStatus jobStatus, @Nullable Throwable cause) {
+            return ArchivedExecutionGraph.createFromInitializingJob(
+                    new JobID(), "testJob", jobStatus, cause, 0L);
+        }
+
+        @Override
+        public void runIfState(State expectedState, Runnable action, Duration 
delay) {
+            if (!hadStateTransitionHappened) {
+                action.run();
+            }
+        }
+
+        @Override
+        public CreatingExecutionGraph.AssignmentResult tryToAssignSlots(
+                ExecutionGraph executionGraph) {
+            return tryToAssignSlotsFunction.apply(executionGraph);
+        }
+
+        @Override
+        public void goToWaitingForResources() {
+            waitingForResourcesStateValidator.validateInput(null);
+            hadStateTransitionHappened = true;
+        }
+
+        @Override
+        public void close() throws Exception {
+            finishedStateValidator.close();
+            waitingForResourcesStateValidator.close();
+            executingStateValidator.close();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
index 05135bb..8586551 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
@@ -22,13 +22,9 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import 
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.function.SupplierWithException;
 
 import org.junit.Test;
 
@@ -53,11 +49,11 @@ public class WaitingForResourcesTest extends TestLogger {
 
     /** WaitingForResources is transitioning to Executing if there are enough 
resources. */
     @Test
-    public void testTransitionToExecuting() throws Exception {
+    public void testTransitionToCreatingExecutionGraph() throws Exception {
         try (MockContext ctx = new MockContext()) {
             ctx.setHasEnoughResources(() -> true);
 
-            ctx.setExpectExecuting(assertNonNull());
+            ctx.setExpectCreatingExecutionGraph();
 
             new WaitingForResources(ctx, log, RESOURCE_COUNTER, Duration.ZERO);
             // run delayed actions
@@ -71,31 +67,6 @@ public class WaitingForResourcesTest extends TestLogger {
     }
 
     @Test
-    public void 
testTransitionToFinishedOnExecutionGraphInitializationFailure() throws 
Exception {
-        try (MockContext ctx = new MockContext()) {
-            ctx.setHasEnoughResources(() -> true);
-            ctx.setCreateExecutionGraphWithAvailableResources(
-                    () -> {
-                        throw new RuntimeException("Test exception");
-                    });
-
-            ctx.setExpectFinished(
-                    (archivedExecutionGraph -> {
-                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
-                    }));
-
-            new WaitingForResources(ctx, log, RESOURCE_COUNTER, Duration.ZERO);
-
-            // run delayed actions
-            for (ScheduledRunnable scheduledRunnable : 
ctx.getScheduledRunnables()) {
-                if (!ctx.hasStateTransition()) {
-                    scheduledRunnable.runAction();
-                }
-            }
-        }
-    }
-
-    @Test
     public void testNotEnoughResources() throws Exception {
         try (MockContext ctx = new MockContext()) {
             ctx.setHasEnoughResources(() -> false);
@@ -114,7 +85,7 @@ public class WaitingForResourcesTest extends TestLogger {
             WaitingForResources wfr =
                     new WaitingForResources(ctx, log, RESOURCE_COUNTER, 
Duration.ZERO);
             ctx.setHasEnoughResources(() -> true); // make resources available
-            ctx.setExpectExecuting(assertNonNull());
+            ctx.setExpectCreatingExecutionGraph();
             wfr.notifyNewResourcesAvailable(); // .. and notify
         }
     }
@@ -126,7 +97,7 @@ public class WaitingForResourcesTest extends TestLogger {
             WaitingForResources wfr =
                     new WaitingForResources(ctx, log, RESOURCE_COUNTER, 
Duration.ZERO);
 
-            ctx.setExpectExecuting(assertNonNull());
+            ctx.setExpectCreatingExecutionGraph();
 
             // immediately execute all scheduled runnables
             assertThat(ctx.getScheduledRunnables().size(), greaterThan(0));
@@ -195,15 +166,12 @@ public class WaitingForResourcesTest extends TestLogger {
 
     private static class MockContext implements WaitingForResources.Context, 
AutoCloseable {
 
-        private final StateValidator<ExecutionGraph> executingStateValidator =
+        private final StateValidator<Void> 
creatingExecutionGraphStateValidator =
                 new StateValidator<>("executing");
         private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
                 new StateValidator<>("finished");
 
         private Supplier<Boolean> hasEnoughResourcesSupplier = () -> false;
-        private SupplierWithException<ExecutionGraph, FlinkException>
-                createExecutionGraphWithAvailableResources =
-                        () -> 
TestingDefaultExecutionGraphBuilder.newBuilder().build();
         private final List<ScheduledRunnable> scheduledRunnables = new 
ArrayList<>();
         private boolean hasStateTransition = false;
 
@@ -215,22 +183,17 @@ public class WaitingForResourcesTest extends TestLogger {
             hasEnoughResourcesSupplier = sup;
         }
 
-        public void setCreateExecutionGraphWithAvailableResources(
-                SupplierWithException<ExecutionGraph, FlinkException> sup) {
-            this.createExecutionGraphWithAvailableResources = sup;
-        }
-
         void setExpectFinished(Consumer<ArchivedExecutionGraph> asserter) {
             finishedStateValidator.expectInput(asserter);
         }
 
-        void setExpectExecuting(Consumer<ExecutionGraph> asserter) {
-            executingStateValidator.expectInput(asserter);
+        void setExpectCreatingExecutionGraph() {
+            creatingExecutionGraphStateValidator.expectInput(none -> {});
         }
 
         @Override
         public void close() throws Exception {
-            executingStateValidator.close();
+            creatingExecutionGraphStateValidator.close();
             finishedStateValidator.close();
         }
 
@@ -249,11 +212,6 @@ public class WaitingForResourcesTest extends TestLogger {
         }
 
         @Override
-        public ExecutionGraph createExecutionGraphWithAvailableResources() 
throws FlinkException {
-            return createExecutionGraphWithAvailableResources.get();
-        }
-
-        @Override
         public void runIfState(State expectedState, Runnable action, Duration 
delay) {
             scheduledRunnables.add(new ScheduledRunnable(expectedState, 
action, delay));
         }
@@ -265,8 +223,8 @@ public class WaitingForResourcesTest extends TestLogger {
         }
 
         @Override
-        public void goToExecuting(ExecutionGraph executionGraph) {
-            executingStateValidator.validateInput(executionGraph);
+        public void goToCreatingExecutionGraph() {
+            creatingExecutionGraphStateValidator.validateInput(null);
             hasStateTransition = true;
         }
 

Reply via email to