This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fb94512ae02df71cf6ef9255e96852b3220f5c9a
Author: Till Rohrmann <[email protected]>
AuthorDate: Tue Mar 16 11:57:11 2021 +0100

    [hotfix] Factor ExecutionGraph creation out into ExecutionGraphFactory
    
    Using the ExecutionGraphFactory for creating and restoring an 
ExecutionGraph allows
    to share the functionality between the DefaultScheduler and the 
AdaptiveScheduler.
---
 .../scheduler/DefaultExecutionGraphFactory.java    | 169 +++++++++++++++++++++
 .../flink/runtime/scheduler/DefaultScheduler.java  |  24 +--
 .../runtime/scheduler/DefaultSchedulerFactory.java |  22 ++-
 .../runtime/scheduler/ExecutionGraphFactory.java   |  59 +++++++
 .../flink/runtime/scheduler/SchedulerBase.java     | 149 ++----------------
 .../scheduler/adaptive/AdaptiveScheduler.java      | 123 ++-------------
 .../adaptive/AdaptiveSchedulerFactory.java         |  24 ++-
 .../DefaultExecutionGraphFactoryTest.java          | 156 +++++++++++++++++++
 .../runtime/scheduler/DefaultSchedulerTest.java    |  88 -----------
 .../runtime/scheduler/SchedulerTestingUtils.java   |  22 ++-
 .../adaptive/AdaptiveSchedulerBuilder.java         |  24 ++-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 144 ------------------
 12 files changed, 481 insertions(+), 523 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
new file mode 100644
index 0000000..3dbb80d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
+import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import 
org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/** Default {@link ExecutionGraphFactory} implementation. */
+public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
+
+    private final Configuration configuration;
+    private final ClassLoader userCodeClassLoader;
+    private final ExecutionDeploymentTracker executionDeploymentTracker;
+    private final ScheduledExecutorService futureExecutor;
+    private final Executor ioExecutor;
+    private final Time rpcTimeout;
+    private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
+    private final BlobWriter blobWriter;
+    private final ShuffleMaster<?> shuffleMaster;
+    private final JobMasterPartitionTracker jobMasterPartitionTracker;
+
+    public DefaultExecutionGraphFactory(
+            Configuration configuration,
+            ClassLoader userCodeClassLoader,
+            ExecutionDeploymentTracker executionDeploymentTracker,
+            ScheduledExecutorService futureExecutor,
+            Executor ioExecutor,
+            Time rpcTimeout,
+            JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            BlobWriter blobWriter,
+            ShuffleMaster<?> shuffleMaster,
+            JobMasterPartitionTracker jobMasterPartitionTracker) {
+        this.configuration = configuration;
+        this.userCodeClassLoader = userCodeClassLoader;
+        this.executionDeploymentTracker = executionDeploymentTracker;
+        this.futureExecutor = futureExecutor;
+        this.ioExecutor = ioExecutor;
+        this.rpcTimeout = rpcTimeout;
+        this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+        this.blobWriter = blobWriter;
+        this.shuffleMaster = shuffleMaster;
+        this.jobMasterPartitionTracker = jobMasterPartitionTracker;
+    }
+
+    @Override
+    public ExecutionGraph createAndRestoreExecutionGraph(
+            JobGraph jobGraph,
+            CompletedCheckpointStore completedCheckpointStore,
+            CheckpointsCleaner checkpointsCleaner,
+            CheckpointIDCounter checkpointIdCounter,
+            TaskDeploymentDescriptorFactory.PartitionLocationConstraint 
partitionLocationConstraint,
+            long initializationTimestamp,
+            VertexAttemptNumberStore vertexAttemptNumberStore,
+            Logger log)
+            throws Exception {
+        ExecutionDeploymentListener executionDeploymentListener =
+                new 
ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
+        ExecutionStateUpdateListener executionStateUpdateListener =
+                (execution, newState) -> {
+                    if (newState.isTerminal()) {
+                        
executionDeploymentTracker.stopTrackingDeploymentOf(execution);
+                    }
+                };
+
+        final ExecutionGraph newExecutionGraph =
+                DefaultExecutionGraphBuilder.buildGraph(
+                        jobGraph,
+                        configuration,
+                        futureExecutor,
+                        ioExecutor,
+                        userCodeClassLoader,
+                        completedCheckpointStore,
+                        checkpointsCleaner,
+                        checkpointIdCounter,
+                        rpcTimeout,
+                        jobManagerJobMetricGroup,
+                        blobWriter,
+                        log,
+                        shuffleMaster,
+                        jobMasterPartitionTracker,
+                        partitionLocationConstraint,
+                        executionDeploymentListener,
+                        executionStateUpdateListener,
+                        initializationTimestamp,
+                        vertexAttemptNumberStore);
+
+        final CheckpointCoordinator checkpointCoordinator =
+                newExecutionGraph.getCheckpointCoordinator();
+
+        if (checkpointCoordinator != null) {
+            // check whether we find a valid checkpoint
+            if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
+                    new 
HashSet<>(newExecutionGraph.getAllVertices().values()))) {
+
+                // check whether we can restore from a savepoint
+                tryRestoreExecutionGraphFromSavepoint(
+                        newExecutionGraph, 
jobGraph.getSavepointRestoreSettings());
+            }
+        }
+
+        return newExecutionGraph;
+    }
+
+    /**
+     * Tries to restore the given {@link ExecutionGraph} from the provided 
{@link
+     * SavepointRestoreSettings}, iff checkpointing is enabled.
+     *
+     * @param executionGraphToRestore {@link ExecutionGraph} which is supposed 
to be restored
+     * @param savepointRestoreSettings {@link SavepointRestoreSettings} 
containing information about
+     *     the savepoint to restore from
+     * @throws Exception if the {@link ExecutionGraph} could not be restored
+     */
+    private void tryRestoreExecutionGraphFromSavepoint(
+            ExecutionGraph executionGraphToRestore,
+            SavepointRestoreSettings savepointRestoreSettings)
+            throws Exception {
+        if (savepointRestoreSettings.restoreSavepoint()) {
+            final CheckpointCoordinator checkpointCoordinator =
+                    executionGraphToRestore.getCheckpointCoordinator();
+            if (checkpointCoordinator != null) {
+                checkpointCoordinator.restoreSavepoint(
+                        savepointRestoreSettings.getRestorePath(),
+                        savepointRestoreSettings.allowNonRestoredState(),
+                        executionGraphToRestore.getAllVertices(),
+                        userCodeClassLoader);
+            }
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 394d1cf..be29192 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -20,9 +20,7 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -38,13 +36,11 @@ import 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHa
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
-import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -52,7 +48,6 @@ import 
org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -70,7 +65,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiFunction;
@@ -106,25 +100,20 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
             final Executor ioExecutor,
             final Configuration jobMasterConfiguration,
             final Consumer<ComponentMainThreadExecutor> startUpAction,
-            final ScheduledExecutorService futureExecutor,
             final ScheduledExecutor delayExecutor,
             final ClassLoader userCodeLoader,
             final CheckpointRecoveryFactory checkpointRecoveryFactory,
-            final Time rpcTimeout,
-            final BlobWriter blobWriter,
             final JobManagerJobMetricGroup jobManagerJobMetricGroup,
-            final ShuffleMaster<?> shuffleMaster,
-            final JobMasterPartitionTracker partitionTracker,
             final SchedulingStrategyFactory schedulingStrategyFactory,
             final FailoverStrategy.Factory failoverStrategyFactory,
             final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
             final ExecutionVertexOperations executionVertexOperations,
             final ExecutionVertexVersioner executionVertexVersioner,
             final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
-            final ExecutionDeploymentTracker executionDeploymentTracker,
             long initializationTimestamp,
             final ComponentMainThreadExecutor mainThreadExecutor,
-            final JobStatusListener jobStatusListener)
+            final JobStatusListener jobStatusListener,
+            final ExecutionGraphFactory executionGraphFactory)
             throws Exception {
 
         super(
@@ -132,19 +121,14 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
                 jobGraph,
                 ioExecutor,
                 jobMasterConfiguration,
-                futureExecutor,
                 userCodeLoader,
                 checkpointRecoveryFactory,
-                rpcTimeout,
-                blobWriter,
                 jobManagerJobMetricGroup,
-                shuffleMaster,
-                partitionTracker,
                 executionVertexVersioner,
-                executionDeploymentTracker,
                 initializationTimestamp,
                 mainThreadExecutor,
-                jobStatusListener);
+                jobStatusListener,
+                executionGraphFactory);
 
         this.log = log;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 217008f..cf00887 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -101,31 +101,39 @@ public class DefaultSchedulerFactory implements 
SchedulerNGFactory {
                 jobGraph.getName(),
                 jobGraph.getJobID());
 
+        final ExecutionGraphFactory executionGraphFactory =
+                new DefaultExecutionGraphFactory(
+                        jobMasterConfiguration,
+                        userCodeLoader,
+                        executionDeploymentTracker,
+                        futureExecutor,
+                        ioExecutor,
+                        rpcTimeout,
+                        jobManagerJobMetricGroup,
+                        blobWriter,
+                        shuffleMaster,
+                        partitionTracker);
+
         return new DefaultScheduler(
                 log,
                 jobGraph,
                 ioExecutor,
                 jobMasterConfiguration,
                 schedulerComponents.getStartUpAction(),
-                futureExecutor,
                 new ScheduledExecutorServiceAdapter(futureExecutor),
                 userCodeLoader,
                 checkpointRecoveryFactory,
-                rpcTimeout,
-                blobWriter,
                 jobManagerJobMetricGroup,
-                shuffleMaster,
-                partitionTracker,
                 schedulerComponents.getSchedulingStrategyFactory(),
                 
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
                 restartBackoffTimeStrategy,
                 new DefaultExecutionVertexOperations(),
                 new ExecutionVertexVersioner(),
                 schedulerComponents.getAllocatorFactory(),
-                executionDeploymentTracker,
                 initializationTimestamp,
                 mainThreadExecutor,
-                jobStatusListener);
+                jobStatusListener,
+                executionGraphFactory);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java
new file mode 100644
index 0000000..f7fed50
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+
+/** Factory for creating an {@link ExecutionGraph}. */
+public interface ExecutionGraphFactory {
+
+    /**
+     * Create and restore {@link ExecutionGraph} from the given {@link 
JobGraph} and services.
+     *
+     * @param jobGraph jobGraph to initialize the ExecutionGraph with
+     * @param completedCheckpointStore completedCheckpointStore to pass to the 
CheckpointCoordinator
+     * @param checkpointsCleaner checkpointsCleaner to pass to the 
CheckpointCoordinator
+     * @param checkpointIdCounter checkpointIdCounter to pass to the 
CheckpointCoordinator
+     * @param partitionLocationConstraint partitionLocationConstraint for this 
job
+     * @param initializationTimestamp initializationTimestamp when the 
ExecutionGraph was created
+     * @param vertexAttemptNumberStore vertexAttemptNumberStore keeping 
information about the vertex
+     *     attempts of previous runs
+     * @param log log to use for logging
+     * @return restored {@link ExecutionGraph}
+     * @throws Exception if the {@link ExecutionGraph} could not be created 
and restored
+     */
+    ExecutionGraph createAndRestoreExecutionGraph(
+            JobGraph jobGraph,
+            CompletedCheckpointStore completedCheckpointStore,
+            CheckpointsCleaner checkpointsCleaner,
+            CheckpointIDCounter checkpointIdCounter,
+            TaskDeploymentDescriptorFactory.PartitionLocationConstraint 
partitionLocationConstraint,
+            long initializationTimestamp,
+            VertexAttemptNumberStore vertexAttemptNumberStore,
+            Logger log)
+            throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 87a1354..3ddb55f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -22,13 +22,10 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.queryablestate.KvStateID;
-import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
-import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
@@ -40,37 +37,29 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
-import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
-import 
org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
 import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
@@ -89,7 +78,6 @@ import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTer
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.util.IntArrayList;
 import org.apache.flink.util.ExceptionUtils;
@@ -115,7 +103,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -137,24 +124,12 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
 
     protected final InputsLocationsRetriever inputsLocationsRetriever;
 
-    private final Executor ioExecutor;
-
-    private final Configuration jobMasterConfiguration;
-
-    private final ScheduledExecutorService futureExecutor;
-
-    private final ClassLoader userCodeLoader;
-
     private final CompletedCheckpointStore completedCheckpointStore;
 
     private final CheckpointsCleaner checkpointsCleaner;
 
     private final CheckpointIDCounter checkpointIdCounter;
 
-    private final Time rpcTimeout;
-
-    private final BlobWriter blobWriter;
-
     private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
 
     protected final ExecutionVertexVersioner executionVertexVersioner;
@@ -169,35 +144,27 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
 
     private final List<ErrorInfo> taskFailureHistory = new ArrayList<>();
 
+    private final ExecutionGraphFactory executionGraphFactory;
+
     public SchedulerBase(
             final Logger log,
             final JobGraph jobGraph,
             final Executor ioExecutor,
             final Configuration jobMasterConfiguration,
-            final ScheduledExecutorService futureExecutor,
             final ClassLoader userCodeLoader,
             final CheckpointRecoveryFactory checkpointRecoveryFactory,
-            final Time rpcTimeout,
-            final BlobWriter blobWriter,
             final JobManagerJobMetricGroup jobManagerJobMetricGroup,
-            final ShuffleMaster<?> shuffleMaster,
-            final JobMasterPartitionTracker partitionTracker,
             final ExecutionVertexVersioner executionVertexVersioner,
-            final ExecutionDeploymentTracker executionDeploymentTracker,
             long initializationTimestamp,
             final ComponentMainThreadExecutor mainThreadExecutor,
-            final JobStatusListener jobStatusListener)
+            final JobStatusListener jobStatusListener,
+            final ExecutionGraphFactory executionGraphFactory)
             throws Exception {
 
         this.log = checkNotNull(log);
         this.jobGraph = checkNotNull(jobGraph);
-        this.ioExecutor = checkNotNull(ioExecutor);
-        this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
-        this.futureExecutor = checkNotNull(futureExecutor);
-        this.userCodeLoader = checkNotNull(userCodeLoader);
-        this.rpcTimeout = checkNotNull(rpcTimeout);
+        this.executionGraphFactory = executionGraphFactory;
 
-        this.blobWriter = checkNotNull(blobWriter);
         this.jobManagerJobMetricGroup = checkNotNull(jobManagerJobMetricGroup);
         this.executionVertexVersioner = checkNotNull(executionVertexVersioner);
         this.mainThreadExecutor = mainThreadExecutor;
@@ -216,13 +183,9 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
 
         this.executionGraph =
                 createAndRestoreExecutionGraph(
-                        jobManagerJobMetricGroup,
                         completedCheckpointStore,
                         checkpointsCleaner,
                         checkpointIdCounter,
-                        checkNotNull(shuffleMaster),
-                        checkNotNull(partitionTracker),
-                        checkNotNull(executionDeploymentTracker),
                         initializationTimestamp,
                         mainThreadExecutor,
                         jobStatusListener);
@@ -273,42 +236,25 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
     }
 
     private ExecutionGraph createAndRestoreExecutionGraph(
-            JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
             CompletedCheckpointStore completedCheckpointStore,
             CheckpointsCleaner checkpointsCleaner,
             CheckpointIDCounter checkpointIdCounter,
-            ShuffleMaster<?> shuffleMaster,
-            JobMasterPartitionTracker partitionTracker,
-            ExecutionDeploymentTracker executionDeploymentTracker,
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             JobStatusListener jobStatusListener)
             throws Exception {
 
-        ExecutionGraph newExecutionGraph =
-                createExecutionGraph(
-                        currentJobManagerJobMetricGroup,
+        final ExecutionGraph newExecutionGraph =
+                executionGraphFactory.createAndRestoreExecutionGraph(
+                        jobGraph,
                         completedCheckpointStore,
                         checkpointsCleaner,
                         checkpointIdCounter,
-                        shuffleMaster,
-                        partitionTracker,
-                        executionDeploymentTracker,
-                        initializationTimestamp);
-
-        final CheckpointCoordinator checkpointCoordinator =
-                newExecutionGraph.getCheckpointCoordinator();
-
-        if (checkpointCoordinator != null) {
-            // check whether we find a valid checkpoint
-            if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
-                    new 
HashSet<>(newExecutionGraph.getAllVertices().values()))) {
-
-                // check whether we can restore from a savepoint
-                tryRestoreExecutionGraphFromSavepoint(
-                        newExecutionGraph, 
jobGraph.getSavepointRestoreSettings());
-            }
-        }
+                        
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
+                                jobGraph.getJobType()),
+                        initializationTimestamp,
+                        new DefaultVertexAttemptNumberStore(),
+                        log);
 
         newExecutionGraph.setInternalTaskFailuresListener(
                 new UpdateSchedulerNgOnInternalFailuresListener(this));
@@ -318,75 +264,6 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
         return newExecutionGraph;
     }
 
-    private ExecutionGraph createExecutionGraph(
-            JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
-            CompletedCheckpointStore completedCheckpointStore,
-            CheckpointsCleaner checkpointsCleaner,
-            CheckpointIDCounter checkpointIdCounter,
-            ShuffleMaster<?> shuffleMaster,
-            final JobMasterPartitionTracker partitionTracker,
-            ExecutionDeploymentTracker executionDeploymentTracker,
-            long initializationTimestamp)
-            throws JobExecutionException, JobException {
-
-        ExecutionDeploymentListener executionDeploymentListener =
-                new 
ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
-        ExecutionStateUpdateListener executionStateUpdateListener =
-                (execution, newState) -> {
-                    if (newState.isTerminal()) {
-                        
executionDeploymentTracker.stopTrackingDeploymentOf(execution);
-                    }
-                };
-
-        return DefaultExecutionGraphBuilder.buildGraph(
-                jobGraph,
-                jobMasterConfiguration,
-                futureExecutor,
-                ioExecutor,
-                userCodeLoader,
-                completedCheckpointStore,
-                checkpointsCleaner,
-                checkpointIdCounter,
-                rpcTimeout,
-                currentJobManagerJobMetricGroup,
-                blobWriter,
-                log,
-                shuffleMaster,
-                partitionTracker,
-                
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
-                        jobGraph.getJobType()),
-                executionDeploymentListener,
-                executionStateUpdateListener,
-                initializationTimestamp,
-                new DefaultVertexAttemptNumberStore());
-    }
-
-    /**
-     * Tries to restore the given {@link ExecutionGraph} from the provided 
{@link
-     * SavepointRestoreSettings}.
-     *
-     * @param executionGraphToRestore {@link ExecutionGraph} which is supposed 
to be restored
-     * @param savepointRestoreSettings {@link SavepointRestoreSettings} 
containing information about
-     *     the savepoint to restore from
-     * @throws Exception if the {@link ExecutionGraph} could not be restored
-     */
-    private void tryRestoreExecutionGraphFromSavepoint(
-            ExecutionGraph executionGraphToRestore,
-            SavepointRestoreSettings savepointRestoreSettings)
-            throws Exception {
-        if (savepointRestoreSettings.restoreSavepoint()) {
-            final CheckpointCoordinator checkpointCoordinator =
-                    executionGraphToRestore.getCheckpointCoordinator();
-            if (checkpointCoordinator != null) {
-                checkpointCoordinator.restoreSavepoint(
-                        savepointRestoreSettings.getRestorePath(),
-                        savepointRestoreSettings.allowNonRestoredState(),
-                        executionGraphToRestore.getAllVertices(),
-                        userCodeLoader);
-            }
-        }
-    }
-
     protected void resetForNewExecutions(final Collection<ExecutionVertexID> 
vertices) {
         vertices.stream()
                 .map(this::getExecutionVertex)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index badad27..a7783db 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -21,15 +21,12 @@ package org.apache.flink.runtime.scheduler.adaptive;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
-import org.apache.flink.runtime.blob.BlobWriter;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -45,20 +42,16 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.MutableVertexAttemptNumberStore;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
-import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
@@ -67,10 +60,7 @@ import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
-import 
org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
@@ -88,6 +78,7 @@ import 
org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
 import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
@@ -99,7 +90,6 @@ import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
 import 
org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ReactiveScaleUpController;
 import 
org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ScaleUpController;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.ExceptionUtils;
@@ -118,13 +108,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -164,15 +152,8 @@ public class AdaptiveScheduler
 
     private final long initializationTimestamp;
 
-    private final Configuration configuration;
-    private final ScheduledExecutorService futureExecutor;
     private final Executor ioExecutor;
     private final ClassLoader userCodeClassLoader;
-    private final Time rpcTimeout;
-    private final BlobWriter blobWriter;
-    private final ShuffleMaster<?> shuffleMaster;
-    private final JobMasterPartitionTracker partitionTracker;
-    private final ExecutionDeploymentTracker executionDeploymentTracker;
     private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
 
     private final CompletedCheckpointStore completedCheckpointStore;
@@ -194,6 +175,8 @@ public class AdaptiveScheduler
 
     private final Duration resourceTimeout;
 
+    private final ExecutionGraphFactory executionGraphFactory;
+
     private State state = new Created(this, LOG);
 
     private boolean isTransitioningState = false;
@@ -208,21 +191,16 @@ public class AdaptiveScheduler
             Configuration configuration,
             DeclarativeSlotPool declarativeSlotPool,
             SlotAllocator slotAllocator,
-            ScheduledExecutorService futureExecutor,
             Executor ioExecutor,
             ClassLoader userCodeClassLoader,
             CheckpointRecoveryFactory checkpointRecoveryFactory,
-            Time rpcTimeout,
-            BlobWriter blobWriter,
             JobManagerJobMetricGroup jobManagerJobMetricGroup,
-            ShuffleMaster<?> shuffleMaster,
-            JobMasterPartitionTracker partitionTracker,
             RestartBackoffTimeStrategy restartBackoffTimeStrategy,
-            ExecutionDeploymentTracker executionDeploymentTracker,
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
-            JobStatusListener jobStatusListener)
+            JobStatusListener jobStatusListener,
+            ExecutionGraphFactory executionGraphFactory)
             throws JobExecutionException {
 
         ensureFullyPipelinedStreamingJob(jobGraph);
@@ -230,16 +208,9 @@ public class AdaptiveScheduler
         this.jobInformation = new JobGraphJobInformation(jobGraph);
         this.declarativeSlotPool = declarativeSlotPool;
         this.initializationTimestamp = initializationTimestamp;
-        this.configuration = configuration;
-        this.futureExecutor = futureExecutor;
         this.ioExecutor = ioExecutor;
         this.userCodeClassLoader = userCodeClassLoader;
-        this.rpcTimeout = rpcTimeout;
-        this.blobWriter = blobWriter;
-        this.shuffleMaster = shuffleMaster;
-        this.partitionTracker = partitionTracker;
         this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
-        this.executionDeploymentTracker = executionDeploymentTracker;
         this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
         this.fatalErrorHandler = fatalErrorHandler;
         this.completedCheckpointStore =
@@ -265,6 +236,8 @@ public class AdaptiveScheduler
 
         this.resourceTimeout = 
configuration.get(JobManagerOptions.RESOURCE_WAIT_TIMEOUT);
 
+        this.executionGraphFactory = executionGraphFactory;
+
         registerMetrics();
     }
 
@@ -794,79 +767,15 @@ public class AdaptiveScheduler
     @Nonnull
     private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph 
adjustedJobGraph)
             throws Exception {
-        ExecutionDeploymentListener executionDeploymentListener =
-                new 
ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
-        ExecutionStateUpdateListener executionStateUpdateListener =
-                (execution, newState) -> {
-                    if (newState.isTerminal()) {
-                        
executionDeploymentTracker.stopTrackingDeploymentOf(execution);
-                    }
-                };
-
-        final ExecutionGraph newExecutionGraph =
-                DefaultExecutionGraphBuilder.buildGraph(
-                        adjustedJobGraph,
-                        configuration,
-                        futureExecutor,
-                        ioExecutor,
-                        userCodeClassLoader,
-                        completedCheckpointStore,
-                        checkpointsCleaner,
-                        checkpointIdCounter,
-                        rpcTimeout,
-                        jobManagerJobMetricGroup,
-                        blobWriter,
-                        LOG,
-                        shuffleMaster,
-                        partitionTracker,
-                        
TaskDeploymentDescriptorFactory.PartitionLocationConstraint
-                                .MUST_BE_KNOWN, // AdaptiveScheduler only 
supports streaming jobs
-                        executionDeploymentListener,
-                        executionStateUpdateListener,
-                        initializationTimestamp,
-                        vertexAttemptNumberStore);
-
-        final CheckpointCoordinator checkpointCoordinator =
-                newExecutionGraph.getCheckpointCoordinator();
-
-        if (checkpointCoordinator != null) {
-            // check whether we find a valid checkpoint
-            if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
-                    new 
HashSet<>(newExecutionGraph.getAllVertices().values()))) {
-
-                // check whether we can restore from a savepoint
-                tryRestoreExecutionGraphFromSavepoint(
-                        newExecutionGraph, 
adjustedJobGraph.getSavepointRestoreSettings());
-            }
-        }
-
-        return newExecutionGraph;
-    }
-
-    /**
-     * Tries to restore the given {@link ExecutionGraph} from the provided 
{@link
-     * SavepointRestoreSettings}, iff checkpointing is enabled.
-     *
-     * @param executionGraphToRestore {@link ExecutionGraph} which is supposed 
to be restored
-     * @param savepointRestoreSettings {@link SavepointRestoreSettings} 
containing information about
-     *     the savepoint to restore from
-     * @throws Exception if the {@link ExecutionGraph} could not be restored
-     */
-    private void tryRestoreExecutionGraphFromSavepoint(
-            ExecutionGraph executionGraphToRestore,
-            SavepointRestoreSettings savepointRestoreSettings)
-            throws Exception {
-        if (savepointRestoreSettings.restoreSavepoint()) {
-            final CheckpointCoordinator checkpointCoordinator =
-                    executionGraphToRestore.getCheckpointCoordinator();
-            if (checkpointCoordinator != null) {
-                checkpointCoordinator.restoreSavepoint(
-                        savepointRestoreSettings.getRestorePath(),
-                        savepointRestoreSettings.allowNonRestoredState(),
-                        executionGraphToRestore.getAllVertices(),
-                        userCodeClassLoader);
-            }
-        }
+        return executionGraphFactory.createAndRestoreExecutionGraph(
+                adjustedJobGraph,
+                completedCheckpointStore,
+                checkpointsCleaner,
+                checkpointIdCounter,
+                
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN,
+                initializationTimestamp,
+                vertexAttemptNumberStore,
+                LOG);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index 278baa0..4c790cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -34,6 +34,8 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
@@ -92,26 +94,34 @@ public class AdaptiveSchedulerFactory implements 
SchedulerNGFactory {
         final SlotSharingSlotAllocator slotAllocator =
                 createSlotSharingSlotAllocator(declarativeSlotPool);
 
+        final ExecutionGraphFactory executionGraphFactory =
+                new DefaultExecutionGraphFactory(
+                        jobMasterConfiguration,
+                        userCodeLoader,
+                        executionDeploymentTracker,
+                        futureExecutor,
+                        ioExecutor,
+                        rpcTimeout,
+                        jobManagerJobMetricGroup,
+                        blobWriter,
+                        shuffleMaster,
+                        partitionTracker);
+
         return new AdaptiveScheduler(
                 jobGraph,
                 jobMasterConfiguration,
                 declarativeSlotPool,
                 slotAllocator,
-                futureExecutor,
                 ioExecutor,
                 userCodeLoader,
                 checkpointRecoveryFactory,
-                rpcTimeout,
-                blobWriter,
                 jobManagerJobMetricGroup,
-                shuffleMaster,
-                partitionTracker,
                 restartBackoffTimeStrategy,
-                executionDeploymentTracker,
                 initializationTimestamp,
                 mainThreadExecutor,
                 fatalErrorHandler,
-                jobStatusListener);
+                jobStatusListener,
+                executionGraphFactory);
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java
new file mode 100644
index 0000000..2a57d04
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
+import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
+import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.TestUtils;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/** Tests for the {@link DefaultExecutionGraphFactory}. */
+public class DefaultExecutionGraphFactoryTest extends TestLogger {
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    @Test
+    public void testRestoringModifiedJobFromSavepointFails() throws Exception {
+        final JobGraph jobGraphWithNewOperator = 
createJobGraphWithSavepoint(false, 42L);
+
+        final ExecutionGraphFactory executionGraphFactory = 
createExecutionGraphFactory();
+
+        try {
+            executionGraphFactory.createAndRestoreExecutionGraph(
+                    jobGraphWithNewOperator,
+                    new StandaloneCompletedCheckpointStore(1),
+                    new CheckpointsCleaner(),
+                    new StandaloneCheckpointIDCounter(),
+                    
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN,
+                    0L,
+                    new DefaultVertexAttemptNumberStore(),
+                    log);
+            fail("Expected ExecutionGraph creation to fail because of non 
restored state.");
+        } catch (Exception e) {
+            assertThat(
+                    e, FlinkMatchers.containsMessage("Failed to rollback to 
checkpoint/savepoint"));
+        }
+    }
+
+    @Test
+    public void 
testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds()
+            throws Exception {
+        // create savepoint data
+        final long savepointId = 42L;
+        final JobGraph jobGraphWithNewOperator = 
createJobGraphWithSavepoint(true, savepointId);
+
+        final ExecutionGraphFactory executionGraphFactory = 
createExecutionGraphFactory();
+
+        final StandaloneCompletedCheckpointStore completedCheckpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        executionGraphFactory.createAndRestoreExecutionGraph(
+                jobGraphWithNewOperator,
+                completedCheckpointStore,
+                new CheckpointsCleaner(),
+                new StandaloneCheckpointIDCounter(),
+                
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN,
+                0L,
+                new DefaultVertexAttemptNumberStore(),
+                log);
+
+        final CompletedCheckpoint savepoint = 
completedCheckpointStore.getLatestCheckpoint(false);
+
+        MatcherAssert.assertThat(savepoint, notNullValue());
+
+        MatcherAssert.assertThat(savepoint.getCheckpointID(), 
Matchers.is(savepointId));
+    }
+
+    @Nonnull
+    private ExecutionGraphFactory createExecutionGraphFactory() {
+        final ExecutionGraphFactory executionGraphFactory =
+                new DefaultExecutionGraphFactory(
+                        new Configuration(),
+                        ClassLoader.getSystemClassLoader(),
+                        new DefaultExecutionDeploymentTracker(),
+                        TestingUtils.defaultExecutor(),
+                        TestingUtils.defaultExecutor(),
+                        Time.milliseconds(0L),
+                        
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
+                        VoidBlobWriter.getInstance(),
+                        NettyShuffleMaster.INSTANCE,
+                        NoOpJobMasterPartitionTracker.INSTANCE);
+        return executionGraphFactory;
+    }
+
+    @Nonnull
+    private JobGraph createJobGraphWithSavepoint(boolean 
allowNonRestoredState, long savepointId)
+            throws IOException {
+        // create savepoint data
+        final OperatorID operatorID = new OperatorID();
+        final File savepointFile =
+                TestUtils.createSavepointWithOperatorState(
+                        TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
+
+        // set savepoint settings which don't allow non restored state
+        final SavepointRestoreSettings savepointRestoreSettings =
+                SavepointRestoreSettings.forPath(
+                        savepointFile.getAbsolutePath(), 
allowNonRestoredState);
+
+        // create a new operator
+        final JobVertex jobVertex = new JobVertex("New operator");
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        jobVertex.setParallelism(1);
+
+        // this test will fail in the end due to the previously created 
Savepoint having a state for
+        // a given OperatorID that does not match any operator of the newly 
created JobGraph
+        return TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
+                savepointRestoreSettings, jobVertex);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 3be6b25..e924538 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -24,10 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
@@ -49,12 +45,9 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestUtils;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
@@ -72,7 +65,6 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
 
-import org.hamcrest.MatcherAssert;
 import org.hamcrest.core.Is;
 import org.junit.After;
 import org.junit.Before;
@@ -80,7 +72,6 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -94,7 +85,6 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import static 
org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
@@ -1141,84 +1131,6 @@ public class DefaultSchedulerTest extends TestLogger {
                 
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
     }
 
-    @Test
-    public void testRestoringModifiedJobFromSavepointFails() throws Exception {
-        // create savepoint data
-        final long savepointId = 42L;
-        final OperatorID operatorID = new OperatorID();
-        final File savepointFile =
-                TestUtils.createSavepointWithOperatorState(
-                        TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
-
-        // set savepoint settings which don't allow non restored state
-        final SavepointRestoreSettings savepointRestoreSettings =
-                
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), false);
-
-        // create a new operator
-        final JobVertex jobVertex = new JobVertex("New operator");
-        jobVertex.setInvokableClass(NoOpInvokable.class);
-
-        // this test will fail in the end due to the previously created 
Savepoint having a state for
-        // a given OperatorID that does not match any operator of the newly 
created JobGraph
-        final JobGraph jobGraphWithNewOperator =
-                TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
-                        savepointRestoreSettings, jobVertex);
-
-        try {
-            // creating the DefaultScheduler should try to restore the 
ExecutionGraph
-            SchedulerTestingUtils.newSchedulerBuilder(
-                            jobGraphWithNewOperator,
-                            
ComponentMainThreadExecutorServiceAdapter.forMainThread())
-                    .build();
-            fail("Expected JobMaster creation to fail because of restore 
failure.");
-        } catch (IllegalStateException ise) {
-            assertThat(
-                    ise,
-                    FlinkMatchers.containsMessage("Failed to rollback to 
checkpoint/savepoint"));
-        }
-    }
-
-    @Test
-    public void 
testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds()
-            throws Exception {
-        // create savepoint data
-        final long savepointId = 42L;
-        final OperatorID operatorID = new OperatorID();
-        final File savepointFile =
-                TestUtils.createSavepointWithOperatorState(
-                        TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
-
-        // allow for non restored state
-        final SavepointRestoreSettings savepointRestoreSettings =
-                
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), true);
-
-        // create a new operator
-        final JobVertex jobVertex = new JobVertex("New operator");
-        jobVertex.setInvokableClass(NoOpInvokable.class);
-        final JobGraph jobGraphWithNewOperator =
-                TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
-                        savepointRestoreSettings, jobVertex);
-
-        final StandaloneCompletedCheckpointStore completedCheckpointStore =
-                new StandaloneCompletedCheckpointStore(1);
-        final CheckpointRecoveryFactory testingCheckpointRecoveryFactory =
-                useSameServicesForAllJobs(
-                        completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
-
-        SchedulerTestingUtils.newSchedulerBuilder(
-                        jobGraphWithNewOperator,
-                        
ComponentMainThreadExecutorServiceAdapter.forMainThread())
-                .setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory)
-                .build();
-
-        // creating the DefaultScheduler should have read the savepoint
-        final CompletedCheckpoint savepoint = 
completedCheckpointStore.getLatestCheckpoint(false);
-
-        MatcherAssert.assertThat(savepoint, notNullValue());
-
-        MatcherAssert.assertThat(savepoint.getCheckpointID(), is(savepointId));
-    }
-
     private static TaskExecutionState createFailedTaskExecutionState(
             ExecutionAttemptID executionAttemptID) {
         return new TaskExecutionState(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 1901953..9f7d993 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -523,31 +523,39 @@ public class SchedulerTestingUtils {
         }
 
         public DefaultScheduler build() throws Exception {
+            final ExecutionGraphFactory executionGraphFactory =
+                    new DefaultExecutionGraphFactory(
+                            jobMasterConfiguration,
+                            userCodeLoader,
+                            new DefaultExecutionDeploymentTracker(),
+                            futureExecutor,
+                            ioExecutor,
+                            rpcTimeout,
+                            jobManagerJobMetricGroup,
+                            blobWriter,
+                            shuffleMaster,
+                            partitionTracker);
+
             return new DefaultScheduler(
                     log,
                     jobGraph,
                     ioExecutor,
                     jobMasterConfiguration,
                     componentMainThreadExecutor -> {},
-                    futureExecutor,
                     delayExecutor,
                     userCodeLoader,
                     checkpointRecoveryFactory,
-                    rpcTimeout,
-                    blobWriter,
                     jobManagerJobMetricGroup,
-                    shuffleMaster,
-                    partitionTracker,
                     schedulingStrategyFactory,
                     failoverStrategyFactory,
                     restartBackoffTimeStrategy,
                     executionVertexOperations,
                     executionVertexVersioner,
                     executionSlotAllocatorFactory,
-                    new DefaultExecutionDeploymentTracker(),
                     System.currentTimeMillis(),
                     mainThreadExecutor,
-                    jobStatusListener);
+                    jobStatusListener,
+                    executionGraphFactory);
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index e005170..9435ccd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -37,6 +37,8 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
@@ -182,6 +184,19 @@ public class AdaptiveSchedulerBuilder {
     }
 
     public AdaptiveScheduler build() throws Exception {
+        final ExecutionGraphFactory executionGraphFactory =
+                new DefaultExecutionGraphFactory(
+                        jobMasterConfiguration,
+                        userCodeLoader,
+                        new DefaultExecutionDeploymentTracker(),
+                        futureExecutor,
+                        ioExecutor,
+                        rpcTimeout,
+                        jobManagerJobMetricGroup,
+                        blobWriter,
+                        shuffleMaster,
+                        partitionTracker);
+
         return new AdaptiveScheduler(
                 jobGraph,
                 jobMasterConfiguration,
@@ -190,20 +205,15 @@ public class AdaptiveSchedulerBuilder {
                         ? 
AdaptiveSchedulerFactory.createSlotSharingSlotAllocator(
                                 declarativeSlotPool)
                         : slotAllocator,
-                futureExecutor,
                 ioExecutor,
                 userCodeLoader,
                 checkpointRecoveryFactory,
-                rpcTimeout,
-                blobWriter,
                 jobManagerJobMetricGroup,
-                shuffleMaster,
-                partitionTracker,
                 restartBackoffTimeStrategy,
-                new DefaultExecutionDeploymentTracker(),
                 initializationTimestamp,
                 mainThreadExecutor,
                 fatalErrorHandler,
-                jobStatusListener);
+                jobStatusListener,
+                executionGraphFactory);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index c4ba684..9bbf7a5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -22,16 +22,11 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
@@ -54,13 +49,11 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.TestUtils;
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils;
@@ -73,22 +66,17 @@ import 
org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.scheduler.GloballyTerminalJobStatusListener;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.function.FunctionUtils;
 
-import org.hamcrest.MatcherAssert;
-import org.hamcrest.Matchers;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -97,7 +85,6 @@ import org.slf4j.Logger;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Optional;
@@ -111,13 +98,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
-import static 
org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
 import static 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -764,135 +749,6 @@ public class AdaptiveSchedulerTest extends TestLogger {
         scheduler.requestPartitionState(new IntermediateDataSetID(), new 
ResultPartitionID());
     }
 
-    @Test
-    public void testRestoringModifiedJobFromSavepointFails() throws Exception {
-        // create savepoint data
-        final long savepointId = 42L;
-        final OperatorID operatorID = new OperatorID();
-        final File savepointFile =
-                TestUtils.createSavepointWithOperatorState(
-                        TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
-
-        // set savepoint settings which don't allow non restored state
-        final SavepointRestoreSettings savepointRestoreSettings =
-                
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), false);
-
-        // create a new operator
-        final JobVertex jobVertex = new JobVertex("New operator");
-        jobVertex.setInvokableClass(NoOpInvokable.class);
-        jobVertex.setParallelism(1);
-
-        // this test will fail in the end due to the previously created 
Savepoint having a state for
-        // a given OperatorID that does not match any operator of the newly 
created JobGraph
-        final JobGraph jobGraphWithNewOperator =
-                TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
-                        savepointRestoreSettings, jobVertex);
-
-        final DefaultDeclarativeSlotPool declarativeSlotPool =
-                createDeclarativeSlotPool(jobGraphWithNewOperator.getJobID());
-
-        final GloballyTerminalJobStatusListener jobStatusListener =
-                new GloballyTerminalJobStatusListener();
-
-        final AdaptiveScheduler adaptiveScheduler =
-                new AdaptiveSchedulerBuilder(
-                                jobGraphWithNewOperator, 
singleThreadMainThreadExecutor)
-                        .setDeclarativeSlotPool(declarativeSlotPool)
-                        .setJobStatusListener(jobStatusListener)
-                        .build();
-
-        singleThreadMainThreadExecutor.execute(
-                () -> {
-                    adaptiveScheduler.startScheduling();
-                    offerSlots(
-                            declarativeSlotPool,
-                            createSlotOffersForResourceRequirements(
-                                    
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)));
-                });
-
-        assertThat(jobStatusListener.getTerminationFuture().join(), 
is(JobStatus.FAILED));
-
-        final ArchivedExecutionGraph archivedExecutionGraph =
-                CompletableFuture.supplyAsync(
-                                () -> 
adaptiveScheduler.requestJob().getArchivedExecutionGraph(),
-                                singleThreadMainThreadExecutor)
-                        .join();
-
-        assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED));
-        assertThat(
-                archivedExecutionGraph.getFailureInfo().getException(),
-                FlinkMatchers.containsMessage("Failed to rollback to 
checkpoint/savepoint"));
-    }
-
-    @Test
-    public void 
testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds()
-            throws Exception {
-        // create savepoint data
-        final long savepointId = 42L;
-        final OperatorID operatorID = new OperatorID();
-        final File savepointFile =
-                TestUtils.createSavepointWithOperatorState(
-                        TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
-
-        // allow for non restored state
-        final SavepointRestoreSettings savepointRestoreSettings =
-                
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), true);
-
-        // create a new operator
-        final JobVertex jobVertex = new JobVertex("New operator");
-        jobVertex.setInvokableClass(NoOpInvokable.class);
-        jobVertex.setParallelism(1);
-
-        final JobGraph jobGraphWithNewOperator =
-                TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
-                        savepointRestoreSettings, jobVertex);
-
-        final StandaloneCompletedCheckpointStore completedCheckpointStore =
-                new StandaloneCompletedCheckpointStore(1);
-        final CheckpointRecoveryFactory testingCheckpointRecoveryFactory =
-                useSameServicesForAllJobs(
-                        completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
-
-        final DefaultDeclarativeSlotPool declarativeSlotPool =
-                createDeclarativeSlotPool(jobGraphWithNewOperator.getJobID());
-
-        AdaptiveScheduler adaptiveScheduler =
-                new AdaptiveSchedulerBuilder(
-                                jobGraphWithNewOperator, 
singleThreadMainThreadExecutor)
-                        
.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory)
-                        .setDeclarativeSlotPool(declarativeSlotPool)
-                        .build();
-
-        final OneShotLatch submitTaskLatch = new OneShotLatch();
-        final TaskManagerGateway taskManagerGateway =
-                
createWaitingForTaskSubmissionTaskManagerGateway(submitTaskLatch);
-
-        singleThreadMainThreadExecutor.execute(
-                () -> {
-                    adaptiveScheduler.startScheduling();
-
-                    offerSlots(
-                            declarativeSlotPool,
-                            createSlotOffersForResourceRequirements(
-                                    
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
-                            taskManagerGateway);
-                });
-
-        submitTaskLatch.await();
-
-        // starting and offering the required slots should trigger the 
ExecutionGraph creation
-        final CompletedCheckpoint savepoint =
-                CompletableFuture.supplyAsync(
-                                FunctionUtils.uncheckedSupplier(
-                                        () -> 
completedCheckpointStore.getLatestCheckpoint(false)),
-                                singleThreadMainThreadExecutor)
-                        .join();
-
-        MatcherAssert.assertThat(savepoint, notNullValue());
-
-        MatcherAssert.assertThat(savepoint.getCheckpointID(), 
Matchers.is(savepointId));
-    }
-
     @Nonnull
     private TaskManagerGateway 
createWaitingForTaskSubmissionTaskManagerGateway(
             OneShotLatch submitTaskLatch) {

Reply via email to