http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 88f9ce0..4cba4e3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -98,6 +98,7 @@ public class ArchivedExecutionGraphTest {
 
                runtimeGraph = new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        new JobID(),
                        "test job",
                        new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index 6f6fcd0..bf3a17c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -112,7 +112,8 @@ public class ExecutionGraphConstructionTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(), 
+                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId, 
                        jobName, 
                        cfg,
@@ -161,7 +162,8 @@ public class ExecutionGraphConstructionTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(), 
+                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId, 
                        jobName, 
                        cfg,
@@ -235,7 +237,8 @@ public class ExecutionGraphConstructionTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(), 
+                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId, 
                        jobName, 
                        cfg,
@@ -494,7 +497,8 @@ public class ExecutionGraphConstructionTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1));
 
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(), 
+                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId, 
                        jobName, 
                        cfg,
@@ -558,7 +562,8 @@ public class ExecutionGraphConstructionTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
 
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(), 
+                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId, 
                        jobName, 
                        cfg,
@@ -626,7 +631,8 @@ public class ExecutionGraphConstructionTest {
                        List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
                        ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(), 
+                               TestingUtils.defaultExecutionContext(),
+                               TestingUtils.defaultExecutionContext(),
                                jobId, 
                                jobName, 
                                cfg,
@@ -672,7 +678,8 @@ public class ExecutionGraphConstructionTest {
                        List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
                        ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(), 
+                               TestingUtils.defaultExecutionContext(),
+                               TestingUtils.defaultExecutionContext(),
                                jobId, 
                                jobName,
                                cfg,
@@ -753,7 +760,8 @@ public class ExecutionGraphConstructionTest {
                        JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, 
v4, v5, v6, v7, v8);
                        
                        ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(), 
+                               TestingUtils.defaultExecutionContext(),
+                               TestingUtils.defaultExecutionContext(),
                                jobId, 
                                jobName, 
                                cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index d4acd8c..ef4f74c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -89,7 +89,8 @@ public class ExecutionGraphDeploymentTest {
                        v4.connectNewDataSetAsInput(v2, 
DistributionPattern.ALL_TO_ALL);
 
                        ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(), 
+                               TestingUtils.defaultExecutionContext(),
+                               TestingUtils.defaultExecutionContext(),
                                jobId, 
                                "some job", 
                                new Configuration(),
@@ -313,6 +314,7 @@ public class ExecutionGraphDeploymentTest {
                // execution graph that executes actions synchronously
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.directExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId,
                        "failing test job",
                        new Configuration(),
@@ -356,7 +358,8 @@ public class ExecutionGraphDeploymentTest {
 
                // execution graph that executes actions synchronously
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.directExecutionContext(), 
+                       TestingUtils.directExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId, 
                        "some job", 
                        new Configuration(), 

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 09e7c3e..d8d8e24 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -147,6 +147,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
                        ExecutionGraph executionGraph = new ExecutionGraph(
                                executor,
+                               executor,
                                jobGraph.getJobID(),
                                jobGraph.getName(),
                                jobConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index d6770a6..52bfc96 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -232,6 +232,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                // Blocking program
                ExecutionGraph executionGraph = new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        new JobID(),
                        "TestJob",
                        new Configuration(),
@@ -547,6 +548,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        new JobID(),
                        "Test job",
                        new Configuration(),
@@ -679,13 +681,14 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
 
        private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy) throws IOException {
                return new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               new JobID(),
-                               "Test job",
-                               new Configuration(),
-                               new SerializedValue<>(new ExecutionConfig()),
-                               AkkaUtils.getDefaultTimeout(),
-                               restartStrategy);
+                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
+                       new JobID(),
+                       "Test job",
+                       new Configuration(),
+                       new SerializedValue<>(new ExecutionConfig()),
+                       AkkaUtils.getDefaultTimeout(),
+                       restartStrategy);
        }
 
        private static void restartAfterFailure(ExecutionGraph eg, 
FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index de4a026..fde967e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -133,6 +133,7 @@ public class ExecutionGraphSignalsTest {
 
                eg = new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId,
                        jobName,
                        cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 28dff02..71ae3b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -174,6 +174,7 @@ public class ExecutionGraphTestUtils {
 
                ExecutionGraph graph = new ExecutionGraph(
                        executor,
+                       executor,
                        new JobID(), 
                        "test job", 
                        new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 8434ed7..19e2d6d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -52,7 +52,8 @@ public class ExecutionStateProgressTest {
                        
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 
                        ExecutionGraph graph = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(), 
+                               TestingUtils.defaultExecutionContext(),
+                               TestingUtils.defaultExecutionContext(),
                                jid, 
                                "test job", 
                                new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 7a28b4a..0e147e3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -66,7 +66,8 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(), 
+                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId, 
                        jobName, 
                        cfg,
@@ -111,7 +112,8 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(), 
+                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId, 
                        jobName, 
                        cfg,
@@ -157,7 +159,8 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(), 
+                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId, 
                        jobName, 
                        cfg,
@@ -204,7 +207,8 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(), 
+                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId, 
                        jobName,
                        cfg,
@@ -249,7 +253,8 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(), 
+                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId, 
                        jobName, 
                        cfg,
@@ -314,7 +319,8 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(), 
+                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId, 
                        jobName, 
                        cfg,
@@ -370,7 +376,8 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(), 
+                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId, 
                        jobName, 
                        cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 4459970..5b1a03e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -181,6 +181,7 @@ public class TerminalStateDeadlockTest {
                TestExecGraph(JobID jobId) throws IOException {
                        super(
                                TestingUtils.defaultExecutionContext(),
+                               TestingUtils.defaultExecutionContext(),
                                jobId,
                                "test graph",
                                EMPTY_CONFIG,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 0c95695..27708a2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -80,13 +80,14 @@ public class VertexSlotSharingTest {
                        List<JobVertex> vertices = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
                        
                        ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       new JobID(),
-                                       "test job",
-                                       new Configuration(),
-                                       new SerializedValue<>(new 
ExecutionConfig()),
-                                       AkkaUtils.getDefaultTimeout(),
-                                       new NoRestartStrategy());
+                               TestingUtils.defaultExecutionContext(),
+                               TestingUtils.defaultExecutionContext(),
+                               new JobID(),
+                               "test job",
+                               new Configuration(),
+                               new SerializedValue<>(new ExecutionConfig()),
+                               AkkaUtils.getDefaultTimeout(),
+                               new NoRestartStrategy());
                        eg.attachJobGraph(vertices);
                        
                        // verify that the vertices are all in the same slot 
sharing group

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
index c0d59fe..4beedb0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
@@ -39,7 +39,7 @@ public class FixedDelayRestartStrategyTest {
                        restartDelay);
 
                ExecutionGraph executionGraph = mock(ExecutionGraph.class);
-               when(executionGraph.getExecutor())
+               when(executionGraph.getFutureExecutor())
                        
.thenReturn(ExecutionContext$.MODULE$.fromExecutor(MoreExecutors.directExecutor()));
 
                while(fixedDelayRestartStrategy.canRestart()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 7d9c521..69aac31 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -90,6 +90,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
@@ -135,6 +136,8 @@ public class JobManagerHARecoveryTest {
                
flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
                
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slots);
 
+               ExecutorService executor = null;
+
                try {
                        Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
 
@@ -152,21 +155,24 @@ public class JobManagerHARecoveryTest {
                                        MemoryArchivist.class,
                                        10), "archive");
 
+                       executor = new ForkJoinPool();
+
                        Props jobManagerProps = Props.create(
-                                       TestingJobManager.class,
-                                       flinkConfiguration,
-                                       new ForkJoinPool(),
-                                       instanceManager,
-                                       scheduler,
-                                       new BlobLibraryCacheManager(new 
BlobServer(flinkConfiguration), 3600000),
-                                       archive,
-                                       new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
-                                       timeout,
-                                       myLeaderElectionService,
-                                       mySubmittedJobGraphStore,
-                                       checkpointStateFactory,
-                                       jobRecoveryTimeout,
-                                       Option.apply(null));
+                               TestingJobManager.class,
+                               flinkConfiguration,
+                               executor,
+                               executor,
+                               instanceManager,
+                               scheduler,
+                               new BlobLibraryCacheManager(new 
BlobServer(flinkConfiguration), 3600000),
+                               archive,
+                               new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
+                               timeout,
+                               myLeaderElectionService,
+                               mySubmittedJobGraphStore,
+                               checkpointStateFactory,
+                               jobRecoveryTimeout,
+                               Option.apply(null));
 
                        jobManager = system.actorOf(jobManagerProps, 
"jobmanager");
                        ActorGateway gateway = new AkkaActorGateway(jobManager, 
leaderSessionID);
@@ -282,6 +288,10 @@ public class JobManagerHARecoveryTest {
                        if (taskManager != null) {
                                taskManager.tell(PoisonPill.getInstance(), 
ActorRef.noSender());
                        }
+
+                       if (executor != null) {
+                               executor.shutdownNow();
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 082b1de..f051281 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -183,20 +183,21 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                CheckpointRecoveryFactory checkpointRecoveryFactory = new 
StandaloneCheckpointRecoveryFactory();
 
                return Props.create(
-                               TestingJobManager.class,
-                               configuration,
-                               executor,
-                               new InstanceManager(),
-                               new 
Scheduler(TestingUtils.defaultExecutionContext()),
-                               new BlobLibraryCacheManager(new 
BlobServer(configuration), 10L),
-                               ActorRef.noSender(),
-                               new 
NoRestartStrategy.NoRestartStrategyFactory(),
-                               AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
-                               leaderElectionService,
-                               submittedJobGraphStore,
-                               checkpointRecoveryFactory,
-                               AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
-                               Option.apply(null)
+                       TestingJobManager.class,
+                       configuration,
+                       executor,
+                       executor,
+                       new InstanceManager(),
+                       new Scheduler(TestingUtils.defaultExecutionContext()),
+                       new BlobLibraryCacheManager(new 
BlobServer(configuration), 10L),
+                       ActorRef.noSender(),
+                       new NoRestartStrategy.NoRestartStrategyFactory(),
+                       AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
+                       leaderElectionService,
+                       submittedJobGraphStore,
+                       checkpointRecoveryFactory,
+                       AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
+                       Option.apply(null)
                );
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 68cb668..1cbd605 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -58,6 +58,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with 
Matchers {
 
         val eg = new ExecutionGraph(
           TestingUtils.defaultExecutionContext,
+          TestingUtils.defaultExecutionContext,
           new JobID(),
           "test job",
           new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 50a5559..269a66f 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
+import java.util.concurrent.{Executor, ExecutorService, TimeUnit, 
TimeoutException}
 
 import akka.actor.{ActorRef, ActorSystem, Props}
 import akka.pattern.Patterns._
@@ -80,7 +80,8 @@ class TestingCluster(
   override def getJobManagerProps(
     jobManagerClass: Class[_ <: JobManager],
     configuration: Configuration,
-    executorService: ExecutorService,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -96,7 +97,8 @@ class TestingCluster(
     val props = super.getJobManagerProps(
       jobManagerClass,
       configuration,
-      executorService,
+      futureExecutor,
+      ioExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index c6fd923..39c7a53 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -39,7 +39,8 @@ import scala.language.postfixOps
   */
 class TestingJobManager(
     flinkConfiguration: Configuration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -53,7 +54,8 @@ class TestingJobManager(
     metricRegistry : Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 37ea68a..e77cbb3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -139,6 +139,7 @@ public class RescalePartitionerTest extends TestLogger {
 
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        jobId,
                        jobName,
                        cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index aabc19d..5244124 100644
--- 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -40,8 +40,9 @@ import scala.concurrent.duration.FiniteDuration
   * instead of an anonymous class with the respective mixin to obtain a more 
readable logger name.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executor Execution context which is used to execute concurrent 
tasks in the
+  * @param futureExecutor Execution context which is used to execute 
concurrent tasks in the
   *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param ioExecutor for blocking io operations
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
@@ -53,7 +54,8 @@ import scala.concurrent.duration.FiniteDuration
   */
 class TestingYarnJobManager(
     flinkConfiguration: Configuration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -67,7 +69,8 @@ class TestingYarnJobManager(
     metricRegistry : Option[MetricRegistry])
   extends YarnJobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index da5959b..8f2cc33 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.Option;
+import scala.Some;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
@@ -339,8 +341,8 @@ public class YarnApplicationMasterRunner {
                                actorSystem,
                                futureExecutor,
                                ioExecutor,
-                               new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
-                               scala.Option.<String>empty(),
+                               new Some<>(JobManager.JOB_MANAGER_NAME()),
+                               Option.<String>empty(),
                                getJobManagerClass(),
                                getArchivistClass())._1();
 
@@ -379,7 +381,6 @@ public class YarnApplicationMasterRunner {
 
                        ActorRef resourceMaster = 
actorSystem.actorOf(resourceMasterProps);
 
-
                        // 4: Process reapers
                        // The process reapers ensure that upon unexpected 
actor death, the process exits
                        // and does not stay lingering around unresponsive
@@ -414,6 +415,9 @@ public class YarnApplicationMasterRunner {
                                }
                        }
 
+                       futureExecutor.shutdownNow();
+                       ioExecutor.shutdownNow();
+
                        return INIT_ERROR_EXIT_CODE;
                }
 
@@ -432,8 +436,11 @@ public class YarnApplicationMasterRunner {
                        }
                }
 
-               futureExecutor.shutdownNow();
-               ioExecutor.shutdownNow();
+               org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
+                       AkkaUtils.getTimeout(config).toMillis(),
+                       TimeUnit.MILLISECONDS,
+                       futureExecutor,
+                       ioExecutor);
 
                return 0;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index a81e6cf..db4eea8 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -40,8 +40,9 @@ import scala.language.postfixOps
   * to start/administer/stop the Yarn session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executor Execution context which is used to execute concurrent 
tasks in the
+  * @param futureExecutor Execution context which is used to execute 
concurrent tasks in the
   *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param ioExecutor for blocking io operations
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
@@ -53,7 +54,8 @@ import scala.language.postfixOps
   */
 class YarnJobManager(
     flinkConfiguration: FlinkConfiguration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -67,7 +69,8 @@ class YarnJobManager(
     metricsRegistry: Option[MetricRegistry])
   extends ContaineredJobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,

Reply via email to