Repository: flink
Updated Branches:
  refs/heads/release-1.1 68585d145 -> cf4b22127


[FLINK-5082] Pull ExecutorService lifecycle management out of the JobManager

The provided ExecutorService will no longer be closed by the JobManager. 
Instead the
lifecycle is managed outside of it where it was created. This will give a nicer 
behaviour,
because it better seperates responsibilities.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fb71c5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fb71c5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fb71c5b

Branch: refs/heads/release-1.1
Commit: 7fb71c5bf1aab85250bf29bd0ea0654079cea48f
Parents: 68585d1
Author: Till Rohrmann <[email protected]>
Authored: Wed Nov 16 18:33:54 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Mon Nov 21 15:53:19 2016 +0100

----------------------------------------------------------------------
 .../BackPressureStatsTrackerITCase.java         |  2 +-
 .../StackTraceSampleCoordinatorITCase.java      |  2 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  1 +
 .../flink/runtime/util/NamedThreadFactory.java  | 58 ++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   | 71 +++++++++++--------
 .../runtime/minicluster/FlinkMiniCluster.scala  | 25 ++++---
 .../minicluster/LocalFlinkMiniCluster.scala     |  1 +
 .../testingUtils/TestingJobManager.scala        |  8 +--
 .../runtime/jobmanager/JobManagerTest.java      | 26 +++----
 .../flink/runtime/jobmanager/JobSubmitTest.java |  7 +-
 .../resourcemanager/ClusterShutdownITCase.java  |  4 +-
 .../resourcemanager/ResourceManagerITCase.java  |  4 +-
 ...askManagerComponentsStartupShutdownTest.java |  1 +
 .../TaskManagerProcessReapingTestBase.java      |  1 +
 .../TaskManagerRegistrationTest.java            |  8 ++-
 .../jobmanager/JobManagerRegistrationTest.scala |  1 +
 .../runtime/testingUtils/TestingCluster.scala   |  6 +-
 .../runtime/testingUtils/TestingUtils.scala     | 72 +++++---------------
 .../test/util/ForkableFlinkMiniCluster.scala    |  1 +
 ...ctTaskManagerProcessFailureRecoveryTest.java |  1 +
 .../recovery/ProcessFailureCancelingITCase.java |  1 +
 .../flink/yarn/TestingYarnJobManager.scala      |  8 +--
 .../flink/yarn/YarnApplicationMasterRunner.java | 19 +++++-
 .../org/apache/flink/yarn/YarnJobManager.scala  | 15 ++--
 24 files changed, 205 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 25dc189..9fbbd90 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -120,7 +120,7 @@ public class BackPressureStatsTrackerITCase extends 
TestLogger {
                        }
 
                        try {
-                               jobManger = 
TestingUtils.createJobManager(testActorSystem, new Configuration());
+                               jobManger = 
TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), 
new Configuration());
 
                                Configuration config = new Configuration();
                                
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 9b1f608..868dae1 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -90,7 +90,7 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
                        ActorGateway taskManager = null;
 
                        try {
-                               jobManger = 
TestingUtils.createJobManager(testActorSystem, new Configuration());
+                               jobManger = 
TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), 
new Configuration());
 
                                Configuration config = new Configuration();
                                
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 677ff54..a6b958a 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -178,6 +178,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                                jobManager[i] = 
JobManager.startJobManagerActors(
                                        jmConfig,
                                        jobManagerSystem[i],
+                                       jobManagerSystem[i].dispatcher(),
                                        JobManager.class,
                                        MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
new file mode 100644
index 0000000..bd97963
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Thread factory which allows to specify a thread pool name and a thread name.
+ *
+ * The code is based on {@link 
java.util.concurrent.Executors.DefaultThreadFactory}.
+ */
+public class NamedThreadFactory implements ThreadFactory {
+       private static final AtomicInteger poolNumber = new AtomicInteger(1);
+       private final ThreadGroup group;
+       private final AtomicInteger threadNumber = new AtomicInteger(1);
+       private final String namePrefix;
+
+       public NamedThreadFactory(final String poolName, final String 
threadName) {
+               SecurityManager securityManager = System.getSecurityManager();
+               group = (securityManager != null) ? 
securityManager.getThreadGroup() :
+                       Thread.currentThread().getThreadGroup();
+
+               namePrefix = poolName +
+                       poolNumber.getAndIncrement() +
+                       threadName;
+       }
+
+       @Override
+       public Thread newThread(Runnable runnable) {
+               Thread t = new Thread(group, runnable,
+                       namePrefix + threadNumber.getAndIncrement(),
+                       0);
+               if (t.isDaemon()) {
+                       t.setDaemon(false);
+               }
+               if (t.getPriority() != Thread.NORM_PRIORITY) {
+                       t.setPriority(Thread.NORM_PRIORITY);
+               }
+               return t;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 41218c9..c6e18e9 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -22,7 +22,7 @@ import java.io.{File, IOException}
 import java.lang.management.ManagementFactory
 import java.net._
 import java.util.UUID
-import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
+import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _}
 import javax.management.ObjectName
 
 import akka.actor.Status.Failure
@@ -50,7 +50,7 @@ import 
org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, 
ExecutionGraphException, ExecutionJobVertex}
-import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, Hardware, 
InstanceManager}
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
 import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
@@ -114,7 +114,7 @@ import scala.language.postfixOps
  */
 class JobManager(
     protected val flinkConfiguration: Configuration,
-    protected val executorService: ExecutorService,
+    protected val executor: Executor,
     protected val instanceManager: InstanceManager,
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
@@ -137,7 +137,7 @@ class JobManager(
 
   /** The extra execution context, for futures, with a custom logging reporter 
*/
   protected val executionContext: ExecutionContext = 
ExecutionContext.fromExecutor(
-    executorService,
+    executor,
     (t: Throwable) => {
       if (!context.system.isTerminated) {
         log.error("Executor could not execute task", t)
@@ -277,9 +277,6 @@ class JobManager(
       case e: IOException => log.error("Could not properly shutdown the 
library cache manager.", e)
     }
 
-    // shut down the extra thread pool for futures
-    executorService.shutdown()
-
     // failsafe shutdown of the metrics registry
     try {
       metricsRegistry.map(_.shutdown())
@@ -2013,15 +2010,29 @@ object JobManager {
       listeningPort: Int)
     : Unit = {
 
-    val (jobManagerSystem, _, _, webMonitorOption, _) = 
startActorSystemAndJobManagerActors(
-      configuration,
-      executionMode,
-      listeningAddress,
-      listeningPort,
-      classOf[JobManager],
-      classOf[MemoryArchivist],
-      Option(classOf[StandaloneResourceManager])
-    )
+    val numberProcessors = Hardware.getNumberCPUCores()
+
+    val executor = Executors.newFixedThreadPool(
+      numberProcessors,
+      new NamedThreadFactory("jobmanager-future-", "-thread-"))
+
+    val (jobManagerSystem, _, _, webMonitorOption, _) = try {
+      startActorSystemAndJobManagerActors(
+        configuration,
+        executionMode,
+        listeningAddress,
+        listeningPort,
+        executor,
+        classOf[JobManager],
+        classOf[MemoryArchivist],
+        Option(classOf[StandaloneResourceManager])
+      )
+    } catch {
+      case t: Throwable =>
+          executor.shutdownNow()
+
+        throw t
+    }
 
     // block until everything is shut down
     jobManagerSystem.awaitTermination()
@@ -2035,6 +2046,8 @@ object JobManager {
             LOG.warn("Could not properly stop the web monitor.", t)
         }
     }
+
+    executor.shutdownNow()
   }
 
   /**
@@ -2142,6 +2155,7 @@ object JobManager {
     *                      additional TaskManager in the same process.
     * @param listeningAddress The hostname where the JobManager should listen 
for messages.
     * @param listeningPort The port where the JobManager should listen for 
messages
+    * @param executor to run the JobManager's futures
     * @param jobManagerClass The class of the JobManager to be started
     * @param archiveClass The class of the Archivist to be started
     * @param resourceManagerClass Optional class of resource manager if one 
should be started
@@ -2153,6 +2167,7 @@ object JobManager {
       executionMode: JobManagerMode,
       listeningAddress: String,
       listeningPort: Int,
+      executor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist],
       resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
@@ -2221,6 +2236,7 @@ object JobManager {
       val (jobManager, archive) = startJobManagerActors(
         configuration,
         jobManagerSystem,
+        executor,
         jobManagerClass,
         archiveClass)
 
@@ -2424,15 +2440,16 @@ object JobManager {
    *              delayBetweenRetries, timeout)
    *
    * @param configuration The configuration from which to parse the config 
values.
+   * @param executor to run JobManager's futures
    * @param leaderElectionServiceOption LeaderElectionService which shall be 
returned if the option
    *                                    is defined
    * @return The members for a default JobManager.
    */
   def createJobManagerComponents(
       configuration: Configuration,
+      executor: Executor,
       leaderElectionServiceOption: Option[LeaderElectionService]) :
-    (ExecutorService,
-    InstanceManager,
+    (InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
     RestartStrategyFactory,
@@ -2462,13 +2479,11 @@ object JobManager {
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
     var libraryCacheManager: BlobLibraryCacheManager = null
-
-    val executorService: ExecutorService = new ForkJoinPool()
     
     try {
       blobServer = new BlobServer(configuration)
       instanceManager = new InstanceManager()
-      scheduler = new 
FlinkScheduler(ExecutionContext.fromExecutor(executorService))
+      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executor))
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, 
cleanupInterval)
 
       instanceManager.addInstanceListener(scheduler)
@@ -2487,7 +2502,6 @@ object JobManager {
         if (blobServer != null) {
           blobServer.shutdown()
         }
-        executorService.shutdownNow()
         
         throw t
     }
@@ -2542,8 +2556,7 @@ object JobManager {
         None
     }
 
-    (executorService,
-      instanceManager,
+    (instanceManager,
       scheduler,
       libraryCacheManager,
       restartStrategy,
@@ -2570,6 +2583,7 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
+      executor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
@@ -2577,6 +2591,7 @@ object JobManager {
     startJobManagerActors(
       configuration,
       actorSystem,
+      executor,
       Some(JOB_MANAGER_NAME),
       Some(ARCHIVE_NAME),
       jobManagerClass,
@@ -2589,6 +2604,7 @@ object JobManager {
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
+   * @param executor to run JobManager's futures
    * @param jobManagerActorName Optionally the name of the JobManager actor. 
If none is given,
    *                          the actor will have the name generated by the 
actor system.
    * @param archiveActorName Optionally the name of the archive actor. If none 
is given,
@@ -2600,14 +2616,14 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
+      executor: Executor,
       jobManagerActorName: Option[String],
       archiveActorName: Option[String],
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
 
-    val (executorService: ExecutorService,
-    instanceManager,
+    val (instanceManager,
     scheduler,
     libraryCacheManager,
     restartStrategy,
@@ -2620,6 +2636,7 @@ object JobManager {
     jobRecoveryTimeout, 
     metricsRegistry) = createJobManagerComponents(
       configuration,
+      executor,
       None)
 
     val archiveProps = Props(archiveClass, archiveCount)
@@ -2633,7 +2650,7 @@ object JobManager {
     val jobManagerProps = Props(
       jobManagerClass,
       configuration,
-      executorService,
+      executor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 5074b8c..271535e 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -20,26 +20,23 @@ package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
 import java.util.UUID
+import java.util.concurrent.Executors
 
 import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
-
 import com.typesafe.config.Config
-
-import org.apache.flink.api.common.{JobID, JobExecutionResult, 
JobSubmissionResult}
+import org.apache.flink.api.common.{JobExecutionResult, JobID, 
JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.{JobExecutionException, JobClient}
-import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
+import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
+import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway, 
Hardware}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.RecoveryMode
-import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalService, 
LeaderRetrievalListener,
-StandaloneLeaderRetrievalService}
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService, StandaloneLeaderRetrievalService}
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.ZooKeeperUtils
-import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
-
+import org.apache.flink.runtime.util.{NamedThreadFactory, ZooKeeperUtils}
+import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -109,6 +106,12 @@ abstract class FlinkMiniCluster(
 
   private var isRunning = false
 
+  val executor = Executors.newFixedThreadPool(
+    Hardware.getNumberCPUCores(),
+    new NamedThreadFactory("mini-cluster-future-", "-thread-"))
+
+
+
   // --------------------------------------------------------------------------
   //                           Abstract Methods
   // --------------------------------------------------------------------------
@@ -370,6 +373,8 @@ abstract class FlinkMiniCluster(
 
     jobManagerLeaderRetrievalService.foreach(_.stop())
     isRunning = false
+
+    executor.shutdownNow
   }
 
   protected def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 5bebd48..594997c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -82,6 +82,7 @@ class LocalFlinkMiniCluster(
     val (jobManager, _) = JobManager.startJobManagerActors(
       config,
       system,
+      executor,
       Some(jobManagerName),
       Some(archiveName),
       classOf[JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 16331ac..e9db9b0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
-
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
@@ -33,15 +32,14 @@ import org.apache.flink.runtime.metrics.MetricRegistry
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
-
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executor
 
 /** JobManager implementation extended by testing messages
   *
   */
 class TestingJobManager(
     flinkConfiguration: Configuration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -56,7 +54,7 @@ class TestingJobManager(
     metricRegistry : Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-      executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index d60e060..148e88f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -388,12 +388,13 @@ public class JobManagerTest extends TestLogger {
                        actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
 
                        Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
-                                       config,
-                                       actorSystem,
-                                       Option.apply("jm"),
-                                       Option.apply("arch"),
-                                       TestingJobManager.class,
-                                       TestingMemoryArchivist.class);
+                               config,
+                               actorSystem,
+                               actorSystem.dispatcher(),
+                               Option.apply("jm"),
+                               Option.apply("arch"),
+                               TestingJobManager.class,
+                               TestingMemoryArchivist.class);
 
                        jobManager = new AkkaActorGateway(master._1(), null);
                        archiver = new AkkaActorGateway(master._2(), null);
@@ -481,12 +482,13 @@ public class JobManagerTest extends TestLogger {
                        actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
 
                        Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
-                                       new Configuration(),
-                                       actorSystem,
-                                       Option.apply("jm"),
-                                       Option.apply("arch"),
-                                       TestingJobManager.class,
-                                       TestingMemoryArchivist.class);
+                               new Configuration(),
+                               actorSystem,
+                               actorSystem.dispatcher(),
+                               Option.apply("jm"),
+                               Option.apply("arch"),
+                               TestingJobManager.class,
+                               TestingMemoryArchivist.class);
 
                        jobManager = new AkkaActorGateway(master._1(), null);
                        archiver = new AkkaActorGateway(master._2(), null);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 536b729..42ed25b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -79,9 +79,10 @@ public class JobSubmitTest {
                // only start JobManager (no ResourceManager)
                JobManager.startJobManagerActors(
                                config,
-                               jobManagerSystem,
-                               JobManager.class,
-                               MemoryArchivist.class)._1();
+                       jobManagerSystem,
+                       jobManagerSystem.dispatcher(),
+                       JobManager.class,
+                       MemoryArchivist.class)._1();
 
                try {
                        LeaderRetrievalService lrs = 
LeaderRetrievalUtils.createLeaderRetrievalService(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
index 32c6cac..8530ce6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
@@ -72,7 +72,7 @@ public class ClusterShutdownITCase extends TestLogger {
 
                        // start job manager which doesn't shutdown the actor 
system
                        ActorGateway jobManager =
-                               TestingUtils.createJobManager(system, config, 
"jobmanager1");
+                               TestingUtils.createJobManager(system, 
system.dispatcher(), config, "jobmanager1");
 
                        // Tell the JobManager to inform us of shutdown actions
                        
jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
@@ -114,7 +114,7 @@ public class ClusterShutdownITCase extends TestLogger {
 
                        // start job manager which doesn't shutdown the actor 
system
                        ActorGateway jobManager =
-                               TestingUtils.createJobManager(system, config, 
"jobmanager2");
+                               TestingUtils.createJobManager(system, 
system.dispatcher(), config, "jobmanager2");
 
                        // Tell the JobManager to inform us of shutdown actions
                        
jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
index ca09634..bfc6abe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
@@ -72,7 +72,7 @@ public class ResourceManagerITCase extends TestLogger {
                protected void run() {
 
                        ActorGateway jobManager =
-                               TestingUtils.createJobManager(system, config, 
"ReconciliationTest");
+                               TestingUtils.createJobManager(system, 
system.dispatcher(), config, "ReconciliationTest");
                        ActorGateway me =
                                TestingUtils.createForwardingActor(system, 
getTestActor(), Option.<String>empty());
 
@@ -125,7 +125,7 @@ public class ResourceManagerITCase extends TestLogger {
                protected void run() {
 
                        ActorGateway jobManager =
-                               TestingUtils.createJobManager(system, config, 
"RegTest");
+                               TestingUtils.createJobManager(system, 
system.dispatcher(), config, "RegTest");
                        ActorGateway me =
                                TestingUtils.createForwardingActor(system, 
getTestActor(), Option.<String>empty());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index b4c456c..46bc7a5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -80,6 +80,7 @@ public class TaskManagerComponentsStartupShutdownTest {
                        final ActorRef jobManager = 
JobManager.startJobManagerActors(
                                config,
                                actorSystem,
+                               actorSystem.dispatcher(),
                                JobManager.class,
                                MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index c7913f7..63c1b29 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -102,6 +102,7 @@ public abstract class TaskManagerProcessReapingTestBase {
                        ActorRef jmActor = JobManager.startJobManagerActors(
                                new Configuration(),
                                jmActorSystem,
+                               jmActorSystem.dispatcher(),
                                JobManager.class,
                                MemoryArchivist.class)._1;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index e23aba7..52d500d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -112,7 +112,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
 
                        try {
                                // a simple JobManager
-                               jobManager = createJobManager(actorSystem, 
config);
+                               jobManager = createJobManager(actorSystem, 
actorSystem.dispatcher(), config);
                                startResourceManager(config, 
jobManager.actor());
 
                                // start two TaskManagers. it will 
automatically try to register
@@ -193,8 +193,9 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
 
                                // now start the JobManager, with the regular 
akka URL
                                jobManager = createJobManager(
-                                               actorSystem,
-                                               new Configuration());
+                                       actorSystem,
+                                       actorSystem.dispatcher(),
+                                       new Configuration());
 
                                startResourceManager(config, 
jobManager.actor());
 
@@ -698,6 +699,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
                return JobManager.startJobManagerActors(
                        configuration,
                        actorSystem,
+                       actorSystem.dispatcher(),
                        NONE_STRING,
                        NONE_STRING,
                        JobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 7feb949..b35cdb4 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -168,6 +168,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
     val (jm: ActorRef, _) = JobManager.startJobManagerActors(
       new Configuration(),
       _system,
+      _system.dispatcher,
       None,
       None,
       classOf[JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index b4ba40b..c3f846e 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -96,8 +96,7 @@ class TestingCluster(
       config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
jobManagerPort + index)
     }
 
-    val (executionContext,
-    instanceManager,
+    val (instanceManager,
     scheduler,
     libraryCacheManager,
     restartStrategyFactory,
@@ -110,6 +109,7 @@ class TestingCluster(
     jobRecoveryTimeout,
     metricRegistry) = JobManager.createJobManagerComponents(
       config,
+      executor,
       createLeaderElectionService())
 
     val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
@@ -118,7 +118,7 @@ class TestingCluster(
     val jobManagerProps = Props(
       new TestingJobManager(
         configuration,
-        executionContext,
+        executor,
         instanceManager,
         scheduler,
         libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 02a0fec..576993d 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -19,26 +19,25 @@
 package org.apache.flink.runtime.testingUtils
 
 import java.util.UUID
+import java.util.concurrent.Executor
 
-import akka.actor.{Props, Kill, ActorSystem, ActorRef}
+import akka.actor.{ActorRef, ActorSystem, Kill, Props}
 import akka.pattern.ask
 import com.google.common.util.concurrent.MoreExecutors
-
 import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobExecutionResult
-
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils
-import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, 
FlinkActor}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
+import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -304,15 +303,18 @@ object TestingUtils {
   /** Creates a testing JobManager using the default recovery mode (standalone)
     *
     * @param actorSystem The ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration The Flink configuration
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration)
     : ActorGateway = {
     createJobManager(
       actorSystem,
+      executor,
       configuration,
       classOf[TestingJobManager],
       ""
@@ -323,86 +325,43 @@ object TestingUtils {
     * Additional prefix can be supplied for the Actor system names
     *
     * @param actorSystem The ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration The Flink configuration
     * @param prefix The prefix for the actor names
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration,
       prefix: String)
     : ActorGateway = {
     createJobManager(
       actorSystem,
+      executor,
       configuration,
       classOf[TestingJobManager],
       prefix
     )
   }
 
-  def createJobManager(
-      actorSystem: ActorSystem,
-      configuration: Configuration,
-      executionContext: ExecutionContext)
-    : ActorGateway = {
-
-    val (_,
-    instanceManager,
-    scheduler,
-    libraryCacheManager,
-    restartStrategy,
-    timeout,
-    archiveCount,
-    leaderElectionService,
-    submittedJobGraphs,
-    checkpointRecoveryFactory,
-    savepointStore,
-    jobRecoveryTimeout,
-    metricsRegistry) = JobManager.createJobManagerComponents(
-      configuration,
-      None
-    )
-
-    val archiveProps = Props(classOf[TestingMemoryArchivist], archiveCount)
-
-    val archive: ActorRef = actorSystem.actorOf(archiveProps, 
JobManager.ARCHIVE_NAME)
-
-    val jobManagerProps = Props(
-      classOf[TestingJobManager],
-      configuration,
-      executionContext,
-      instanceManager,
-      scheduler,
-      libraryCacheManager,
-      archive,
-      restartStrategy,
-      timeout,
-      leaderElectionService,
-      submittedJobGraphs,
-      checkpointRecoveryFactory,
-      jobRecoveryTimeout,
-      metricsRegistry)
-
-    val jobManager: ActorRef = actorSystem.actorOf(jobManagerProps, 
JobManager.JOB_MANAGER_NAME)
-
-    new AkkaActorGateway(jobManager, null)
-  }
-
   /**
     * Creates a JobManager of the given class using the default recovery mode 
(standalone)
     *
     * @param actorSystem ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager])
     : ActorGateway = {
 
-    createJobManager(actorSystem, configuration, jobManagerClass, "")
+    createJobManager(actorSystem, executor, configuration, jobManagerClass, "")
   }
 
   /**
@@ -410,6 +369,7 @@ object TestingUtils {
     * Additional prefix for the Actor names can be added.
     *
     * @param actorSystem ActorSystem to use
+    * @param executor to run the JobManager's futures
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @param prefix The prefix to use for the Actor names
@@ -418,6 +378,7 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
+      executor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager],
       prefix: String)
@@ -428,6 +389,7 @@ object TestingUtils {
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
         actorSystem,
+        executor,
         Some(prefix + JobManager.JOB_MANAGER_NAME),
         Some(prefix + JobManager.ARCHIVE_NAME),
         jobManagerClass,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
 
b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 79c5a25..f2a4c5c 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -103,6 +103,7 @@ class ForkableFlinkMiniCluster(
     val (jobManager, _) = JobManager.startJobManagerActors(
       config,
       actorSystem,
+      executor,
       Some(jobManagerName),
       Some(archiveName),
       classOf[TestingJobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index b6eb7ba..af86983 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -129,6 +129,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                        ActorRef jmActor = JobManager.startJobManagerActors(
                                jmConfig,
                                jmActorSystem,
+                               jmActorSystem.dispatcher(),
                                JobManager.class,
                                MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index b66fb5d..f72ef34 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -104,6 +104,7 @@ public class ProcessFailureCancelingITCase {
                        ActorRef jmActor = JobManager.startJobManagerActors(
                                jmConfig,
                                jmActorSystem,
+                               jmActorSystem.dispatcher(),
                                JobManager.class,
                                MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 7ca9c3e..0ed0d83 100644
--- 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executor
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
@@ -41,7 +41,7 @@ import scala.concurrent.duration.FiniteDuration
   * instead of an anonymous class with the respective mixin to obtain a more 
readable logger name.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute 
concurrent tasks in the
+  * @param executor Execution context which is used to execute concurrent 
tasks in the
   *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -54,7 +54,7 @@ import scala.concurrent.duration.FiniteDuration
   */
 class TestingYarnJobManager(
     flinkConfiguration: Configuration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -69,7 +69,7 @@ class TestingYarnJobManager(
     metricRegistry : Option[MetricRegistry])
   extends YarnJobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index d19ddde..eb00992 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.instance.Hardware;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -35,6 +36,7 @@ import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 
@@ -64,13 +66,15 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 
 /**
  * This class is the executable entry point for the YARN application master.
- * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * It starts actor system and the actors for {@link JobManager}
  * and {@link YarnFlinkResourceManager}.
  * 
  * The JobManager handles Flink job execution, while the 
YarnFlinkResourceManager handles container
@@ -174,6 +178,12 @@ public class YarnApplicationMasterRunner {
                ActorSystem actorSystem = null;
                WebMonitor webMonitor = null;
 
+               int numberProcessors = Hardware.getNumberCPUCores();
+
+               final ExecutorService executor = Executors.newFixedThreadPool(
+                       numberProcessors,
+                       new NamedThreadFactory("yarn-jobmanager-future-", 
"-thread-"));
+
                try {
                        // ------- (1) load and parse / validate all 
configurations -------
 
@@ -277,7 +287,9 @@ public class YarnApplicationMasterRunner {
 
                        // we start the JobManager with its standard name
                        ActorRef jobManager = JobManager.startJobManagerActors(
-                               config, actorSystem,
+                               config,
+                               actorSystem,
+                               executor,
                                new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
                                scala.Option.<String>empty(),
                                getJobManagerClass(),
@@ -364,6 +376,9 @@ public class YarnApplicationMasterRunner {
                                LOG.error("Failed to stop the web frontend", t);
                        }
                }
+
+               executor.shutdownNow();
+
                return 0;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 94ad9f2..d7df66a 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -18,21 +18,20 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.{TimeUnit, ExecutorService}
+import java.util.concurrent.{Executor, TimeUnit}
 
 import akka.actor.ActorRef
-
 import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.{Configuration => FlinkConfiguration, 
ConfigConstants}
+import org.apache.flink.configuration.{ConfigConstants, Configuration => 
FlinkConfiguration}
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.ApplicationStatus
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, 
CurrentJobStatus, JobNotFound}
+import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, 
JobNotFound, RequestJobStatus}
 import org.apache.flink.runtime.messages.Messages.Acknowledge
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -47,7 +46,7 @@ import scala.language.postfixOps
   * to start/administer/stop the Yarn session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute 
concurrent tasks in the
+  * @param executor Execution context which is used to execute concurrent 
tasks in the
   *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -60,7 +59,7 @@ import scala.language.postfixOps
   */
 class YarnJobManager(
     flinkConfiguration: FlinkConfiguration,
-    executorService: ExecutorService,
+    executor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -75,7 +74,7 @@ class YarnJobManager(
     metricsRegistry: Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executorService,
+    executor,
     instanceManager,
     scheduler,
     libraryCacheManager,

Reply via email to