[FLINK-5082] Pull ExecutorService lifecycle management out of the JobManager
The provided ExecutorService will no longer be closed by the JobManager. Instead the lifecycle is managed outside of it where it was created. This will give a nicer behaviour, because it better seperates responsibilities. This closes #2820. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae4b274a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae4b274a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae4b274a Branch: refs/heads/master Commit: ae4b274a9919d01a236df4e819a0a07c5d8543ac Parents: 698e53e Author: Till Rohrmann <[email protected]> Authored: Wed Nov 16 18:33:54 2016 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Nov 22 23:00:16 2016 +0100 ---------------------------------------------------------------------- .../MesosApplicationMasterRunner.java | 18 ++++- .../clusterframework/MesosJobManager.scala | 8 +-- .../BackPressureStatsTrackerITCase.java | 2 +- .../StackTraceSampleCoordinatorITCase.java | 2 +- .../webmonitor/WebRuntimeMonitorITCase.java | 1 + .../flink/runtime/util/NamedThreadFactory.java | 58 ++++++++++++++++ .../ContaineredJobManager.scala | 8 +-- .../flink/runtime/jobmanager/JobManager.scala | 73 ++++++++++++-------- .../runtime/minicluster/FlinkMiniCluster.scala | 9 ++- .../minicluster/LocalFlinkMiniCluster.scala | 10 +-- .../runtime/jobmanager/JobManagerTest.java | 48 +++++++------ .../flink/runtime/jobmanager/JobSubmitTest.java | 9 +-- .../resourcemanager/ClusterShutdownITCase.java | 4 +- .../resourcemanager/ResourceManagerITCase.java | 4 +- ...askManagerComponentsStartupShutdownTest.java | 1 + .../TaskManagerProcessReapingTestBase.java | 1 + .../TaskManagerRegistrationTest.java | 8 ++- .../jobmanager/JobManagerRegistrationTest.scala | 1 + .../testingUtils/TestingJobManager.scala | 6 +- .../runtime/testingUtils/TestingUtils.scala | 60 ++++------------ ...ctTaskManagerProcessFailureRecoveryTest.java | 1 + .../recovery/ProcessFailureCancelingITCase.java | 1 + .../flink/yarn/TestingYarnJobManager.scala | 9 ++- .../flink/yarn/YarnApplicationMasterRunner.java | 19 ++++- .../org/apache/flink/yarn/YarnJobManager.scala | 8 +-- 25 files changed, 229 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 5ec39c2..09f87bd 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -45,8 +45,10 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.process.ProcessReaper; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.Hardware; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.NamedThreadFactory; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.runtime.webmonitor.WebMonitor; @@ -66,6 +68,8 @@ import java.net.URL; import java.security.PrivilegedAction; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.apache.flink.mesos.Utils.uri; @@ -75,7 +79,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** * This class is the executable entry point for the Mesos Application Master. - * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * It starts actor system and the actors for {@link JobManager} * and {@link MesosFlinkResourceManager}. * * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container @@ -168,6 +172,12 @@ public class MesosApplicationMasterRunner { WebMonitor webMonitor = null; MesosArtifactServer artifactServer = null; + int numberProcessors = Hardware.getNumberCPUCores(); + + final ExecutorService executor = Executors.newFixedThreadPool( + numberProcessors, + new NamedThreadFactory("mesos-jobmanager-future-", "-thread-")); + try { // ------- (1) load and parse / validate all configurations ------- @@ -281,7 +291,9 @@ public class MesosApplicationMasterRunner { // we start the JobManager with its standard name ActorRef jobManager = JobManager.startJobManagerActors( - config, actorSystem, + config, + actorSystem, + executor, new scala.Some<>(JobManager.JOB_MANAGER_NAME()), scala.Option.<String>empty(), getJobManagerClass(), @@ -387,6 +399,8 @@ public class MesosApplicationMasterRunner { LOG.error("Failed to stop the artifact server", t); } + executor.shutdownNow(); + return 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala index 113ab85..300539c 100644 --- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala +++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala @@ -18,7 +18,7 @@ package org.apache.flink.mesos.runtime.clusterframework -import java.util.concurrent.ExecutorService +import java.util.concurrent.Executor import akka.actor.ActorRef import org.apache.flink.configuration.{Configuration => FlinkConfiguration} @@ -37,7 +37,7 @@ import scala.concurrent.duration._ /** JobManager actor for execution on Mesos. . * * @param flinkConfiguration Configuration object for the actor - * @param executorService Execution context which is used to execute concurrent tasks in the + * @param executor Execution context which is used to execute concurrent tasks in the * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] @@ -49,7 +49,7 @@ import scala.concurrent.duration._ * @param leaderElectionService LeaderElectionService to participate in the leader election */ class MesosJobManager(flinkConfiguration: FlinkConfiguration, - executorService: ExecutorService, + executor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -63,7 +63,7 @@ class MesosJobManager(flinkConfiguration: FlinkConfiguration, metricsRegistry: Option[FlinkMetricRegistry]) extends ContaineredJobManager( flinkConfiguration, - executorService, + executor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java index 9d099e3..d01e0cf 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java @@ -120,7 +120,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger { } try { - jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration()); + jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration()); final Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java index e012d0b..4b5bd2f 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java @@ -90,7 +90,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger { ActorGateway taskManager = null; try { - jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration()); + jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration()); final Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index 1ae776c..fcdf94d 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -179,6 +179,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { jobManager[i] = JobManager.startJobManagerActors( jmConfig, jobManagerSystem[i], + jobManagerSystem[i].dispatcher(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java new file mode 100644 index 0000000..bd97963 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Thread factory which allows to specify a thread pool name and a thread name. + * + * The code is based on {@link java.util.concurrent.Executors.DefaultThreadFactory}. + */ +public class NamedThreadFactory implements ThreadFactory { + private static final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + public NamedThreadFactory(final String poolName, final String threadName) { + SecurityManager securityManager = System.getSecurityManager(); + group = (securityManager != null) ? securityManager.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + + namePrefix = poolName + + poolNumber.getAndIncrement() + + threadName; + } + + @Override + public Thread newThread(Runnable runnable) { + Thread t = new Thread(group, runnable, + namePrefix + threadNumber.getAndIncrement(), + 0); + if (t.isDaemon()) { + t.setDaemon(false); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala index 72df671..0f31eba 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.clusterframework -import java.util.concurrent.ExecutorService +import java.util.concurrent.Executor import akka.actor.ActorRef import org.apache.flink.api.common.JobID @@ -45,7 +45,7 @@ import scala.language.postfixOps * to start/administer/stop the session. * * @param flinkConfiguration Configuration object for the actor - * @param executorService Execution context which is used to execute concurrent tasks in the + * @param executor Execution context which is used to execute concurrent tasks in the * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] @@ -58,7 +58,7 @@ import scala.language.postfixOps */ abstract class ContaineredJobManager( flinkConfiguration: Configuration, - executorService: ExecutorService, + executor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -72,7 +72,7 @@ abstract class ContaineredJobManager( metricsRegistry: Option[FlinkMetricRegistry]) extends JobManager( flinkConfiguration, - executorService, + executor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index b2e1002..df80d72 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} import java.net._ import java.util.UUID -import java.util.concurrent.{ExecutorService, ForkJoinPool, TimeUnit, TimeoutException} +import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _} import akka.actor.Status.{Failure, Success} import akka.actor._ @@ -118,7 +118,7 @@ import scala.language.postfixOps */ class JobManager( protected val flinkConfiguration: Configuration, - protected val executorService: ExecutorService, + protected val executor: Executor, protected val instanceManager: InstanceManager, protected val scheduler: FlinkScheduler, protected val libraryCacheManager: BlobLibraryCacheManager, @@ -272,9 +272,6 @@ class JobManager( case e: IOException => log.error("Could not properly shutdown the library cache manager.", e) } - // shut down the extra thread pool for futures - executorService.shutdown() - // failsafe shutdown of the metrics registry try { metricsRegistry.foreach(_.shutdown()) @@ -1250,7 +1247,7 @@ class JobManager( executionGraph, jobGraph, flinkConfiguration, - executorService, + executor, userCodeLoader, checkpointRecoveryFactory, Time.of(timeout.length, timeout.unit), @@ -1971,15 +1968,29 @@ object JobManager { listeningPort: Int) : Unit = { - val (jobManagerSystem, _, _, webMonitorOption, _) = startActorSystemAndJobManagerActors( - configuration, - executionMode, - listeningAddress, - listeningPort, - classOf[JobManager], - classOf[MemoryArchivist], - Option(classOf[StandaloneResourceManager]) - ) + val numberProcessors = Hardware.getNumberCPUCores() + + val executor = Executors.newFixedThreadPool( + numberProcessors, + new NamedThreadFactory("jobmanager-future-", "-thread-")) + + val (jobManagerSystem, _, _, webMonitorOption, _) = try { + startActorSystemAndJobManagerActors( + configuration, + executionMode, + listeningAddress, + listeningPort, + executor, + classOf[JobManager], + classOf[MemoryArchivist], + Option(classOf[StandaloneResourceManager]) + ) + } catch { + case t: Throwable => + executor.shutdownNow() + + throw t + } // block until everything is shut down jobManagerSystem.awaitTermination() @@ -1993,6 +2004,8 @@ object JobManager { LOG.warn("Could not properly stop the web monitor.", t) } } + + executor.shutdownNow() } /** @@ -2100,6 +2113,7 @@ object JobManager { * additional TaskManager in the same process. * @param listeningAddress The hostname where the JobManager should listen for messages. * @param listeningPort The port where the JobManager should listen for messages + * @param executor to run the JobManager's futures * @param jobManagerClass The class of the JobManager to be started * @param archiveClass The class of the Archivist to be started * @param resourceManagerClass Optional class of resource manager if one should be started @@ -2111,6 +2125,7 @@ object JobManager { executionMode: JobManagerMode, listeningAddress: String, listeningPort: Int, + executor: Executor, jobManagerClass: Class[_ <: JobManager], archiveClass: Class[_ <: MemoryArchivist], resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]]) @@ -2179,6 +2194,7 @@ object JobManager { val (jobManager, archive) = startJobManagerActors( configuration, jobManagerSystem, + executor, jobManagerClass, archiveClass) @@ -2379,15 +2395,16 @@ object JobManager { * delayBetweenRetries, timeout) * * @param configuration The configuration from which to parse the config values. + * @param executor to run JobManager's futures * @param leaderElectionServiceOption LeaderElectionService which shall be returned if the option * is defined * @return The members for a default JobManager. */ def createJobManagerComponents( configuration: Configuration, + executor: Executor, leaderElectionServiceOption: Option[LeaderElectionService]) : - (ExecutorService, - InstanceManager, + (InstanceManager, FlinkScheduler, BlobLibraryCacheManager, RestartStrategyFactory, @@ -2416,13 +2433,11 @@ object JobManager { var instanceManager: InstanceManager = null var scheduler: FlinkScheduler = null var libraryCacheManager: BlobLibraryCacheManager = null - - val executorService: ExecutorService = new ForkJoinPool() try { blobServer = new BlobServer(configuration) instanceManager = new InstanceManager() - scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executorService)) + scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executor)) libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval) instanceManager.addInstanceListener(scheduler) @@ -2441,7 +2456,6 @@ object JobManager { if (blobServer != null) { blobServer.shutdown() } - executorService.shutdownNow() throw t } @@ -2494,8 +2508,7 @@ object JobManager { None } - (executorService, - instanceManager, + (instanceManager, scheduler, libraryCacheManager, restartStrategy, @@ -2521,6 +2534,7 @@ object JobManager { def startJobManagerActors( configuration: Configuration, actorSystem: ActorSystem, + executor: Executor, jobManagerClass: Class[_ <: JobManager], archiveClass: Class[_ <: MemoryArchivist]) : (ActorRef, ActorRef) = { @@ -2528,6 +2542,7 @@ object JobManager { startJobManagerActors( configuration, actorSystem, + executor, Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME), jobManagerClass, @@ -2540,6 +2555,7 @@ object JobManager { * * @param configuration The configuration for the JobManager * @param actorSystem The actor system running the JobManager + * @param executor to run JobManager's futures * @param jobManagerActorName Optionally the name of the JobManager actor. If none is given, * the actor will have the name generated by the actor system. * @param archiveActorName Optionally the name of the archive actor. If none is given, @@ -2551,14 +2567,14 @@ object JobManager { def startJobManagerActors( configuration: Configuration, actorSystem: ActorSystem, + executor: Executor, jobManagerActorName: Option[String], archiveActorName: Option[String], jobManagerClass: Class[_ <: JobManager], archiveClass: Class[_ <: MemoryArchivist]) : (ActorRef, ActorRef) = { - val (executorService: ExecutorService, - instanceManager, + val (instanceManager, scheduler, libraryCacheManager, restartStrategy, @@ -2570,6 +2586,7 @@ object JobManager { jobRecoveryTimeout, metricsRegistry) = createJobManagerComponents( configuration, + executor, None) val archiveProps = getArchiveProps(archiveClass, archiveCount) @@ -2583,7 +2600,7 @@ object JobManager { val jobManagerProps = getJobManagerProps( jobManagerClass, configuration, - executorService, + executor, instanceManager, scheduler, libraryCacheManager, @@ -2617,7 +2634,7 @@ object JobManager { def getJobManagerProps( jobManagerClass: Class[_ <: JobManager], configuration: Configuration, - executorService: ExecutorService, + executor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -2633,7 +2650,7 @@ object JobManager { Props( jobManagerClass, configuration, - executorService, + executor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 048b013..d9a208d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -20,6 +20,7 @@ package org.apache.flink.runtime.minicluster import java.net.InetAddress import java.util.UUID +import java.util.concurrent.{Executors, ForkJoinPool} import akka.pattern.Patterns.gracefulStop import akka.pattern.ask @@ -34,7 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph import org.apache.flink.runtime.jobmanager.HighAvailabilityMode import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService} import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager -import org.apache.flink.runtime.util.ZooKeeperUtils +import org.apache.flink.runtime.util.{Hardware, NamedThreadFactory, ZooKeeperUtils} import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.slf4j.LoggerFactory @@ -104,6 +105,10 @@ abstract class FlinkMiniCluster( private var isRunning = false + val executor = Executors.newFixedThreadPool( + Hardware.getNumberCPUCores(), + new NamedThreadFactory("mini-cluster-future-", "-thread")) + def configuration: Configuration = { if (originalConfiguration.getInteger( ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, @@ -399,6 +404,8 @@ abstract class FlinkMiniCluster( jobManagerLeaderRetrievalService.foreach(_.stop()) isRunning = false + + executor.shutdownNow } protected def shutdown(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 43ccda9..59dd399 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -114,8 +114,7 @@ class LocalFlinkMiniCluster( config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index) } - val (executorService, - instanceManager, + val (instanceManager, scheduler, libraryCacheManager, restartStrategyFactory, @@ -125,7 +124,10 @@ class LocalFlinkMiniCluster( submittedJobGraphStore, checkpointRecoveryFactory, jobRecoveryTimeout, - metricsRegistry) = JobManager.createJobManagerComponents(config, createLeaderElectionService()) + metricsRegistry) = JobManager.createJobManagerComponents( + config, + executor, + createLeaderElectionService()) val archive = system.actorOf( getArchiveProps( @@ -137,7 +139,7 @@ class LocalFlinkMiniCluster( getJobManagerProps( jobManagerClass, config, - executorService, + executor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index ff604f1..aeb1ae1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -397,10 +397,11 @@ public class JobManagerTest { UUID leaderSessionId = null; ActorGateway jobManager = new AkkaActorGateway( JobManager.startJobManagerActors( - config, - system, - TestingJobManager.class, - MemoryArchivist.class)._1(), + config, + system, + system.dispatcher(), + TestingJobManager.class, + MemoryArchivist.class)._1(), leaderSessionId); LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService( @@ -603,12 +604,13 @@ public class JobManagerTest { actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors( - config, - actorSystem, - Option.apply("jm"), - Option.apply("arch"), - TestingJobManager.class, - TestingMemoryArchivist.class); + config, + actorSystem, + actorSystem.dispatcher(), + Option.apply("jm"), + Option.apply("arch"), + TestingJobManager.class, + TestingMemoryArchivist.class); jobManager = new AkkaActorGateway(master._1(), null); archiver = new AkkaActorGateway(master._2(), null); @@ -729,12 +731,13 @@ public class JobManagerTest { actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors( - config, - actorSystem, - Option.apply("jm"), - Option.apply("arch"), - TestingJobManager.class, - TestingMemoryArchivist.class); + config, + actorSystem, + actorSystem.dispatcher(), + Option.apply("jm"), + Option.apply("arch"), + TestingJobManager.class, + TestingMemoryArchivist.class); jobManager = new AkkaActorGateway(master._1(), null); archiver = new AkkaActorGateway(master._2(), null); @@ -825,12 +828,13 @@ public class JobManagerTest { actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors( - new Configuration(), - actorSystem, - Option.apply("jm"), - Option.apply("arch"), - TestingJobManager.class, - TestingMemoryArchivist.class); + new Configuration(), + actorSystem, + actorSystem.dispatcher(), + Option.apply("jm"), + Option.apply("arch"), + TestingJobManager.class, + TestingMemoryArchivist.class); jobManager = new AkkaActorGateway(master._1(), null); archiver = new AkkaActorGateway(master._2(), null); http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index 9aeea3d..4f26e68 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -80,10 +80,11 @@ public class JobSubmitTest { // only start JobManager (no ResourceManager) JobManager.startJobManagerActors( - jmConfig, - jobManagerSystem, - JobManager.class, - MemoryArchivist.class)._1(); + jmConfig, + jobManagerSystem, + jobManagerSystem.dispatcher(), + JobManager.class, + MemoryArchivist.class)._1(); try { LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(jmConfig); http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java index e0763f3..3c8ea75 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java @@ -72,7 +72,7 @@ public class ClusterShutdownITCase extends TestLogger { // start job manager which doesn't shutdown the actor system ActorGateway jobManager = - TestingUtils.createJobManager(system, config, "jobmanager1"); + TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager1"); // Tell the JobManager to inform us of shutdown actions jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); @@ -114,7 +114,7 @@ public class ClusterShutdownITCase extends TestLogger { // start job manager which doesn't shutdown the actor system ActorGateway jobManager = - TestingUtils.createJobManager(system, config, "jobmanager2"); + TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager2"); // Tell the JobManager to inform us of shutdown actions jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java index c83ce58..5a98c8d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java @@ -75,7 +75,7 @@ public class ResourceManagerITCase extends TestLogger { protected void run() { ActorGateway jobManager = - TestingUtils.createJobManager(system, config, "ReconciliationTest"); + TestingUtils.createJobManager(system, system.dispatcher(), config, "ReconciliationTest"); ActorGateway me = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); @@ -129,7 +129,7 @@ public class ResourceManagerITCase extends TestLogger { protected void run() { ActorGateway jobManager = - TestingUtils.createJobManager(system, config, "RegTest"); + TestingUtils.createJobManager(system, system.dispatcher(), config, "RegTest"); ActorGateway me = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index f9434e2..83eaddb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -85,6 +85,7 @@ public class TaskManagerComponentsStartupShutdownTest { final ActorRef jobManager = JobManager.startJobManagerActors( config, actorSystem, + actorSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java index c7913f7..63c1b29 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java @@ -102,6 +102,7 @@ public abstract class TaskManagerProcessReapingTestBase { ActorRef jmActor = JobManager.startJobManagerActors( new Configuration(), jmActorSystem, + jmActorSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1; http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index 53fa7c1..b21eba0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -106,7 +106,7 @@ public class TaskManagerRegistrationTest extends TestLogger { try { // a simple JobManager - jobManager = createJobManager(actorSystem, config); + jobManager = createJobManager(actorSystem, actorSystem.dispatcher(), config); startResourceManager(config, jobManager.actor()); // start two TaskManagers. it will automatically try to register @@ -187,8 +187,9 @@ public class TaskManagerRegistrationTest extends TestLogger { // now start the JobManager, with the regular akka URL jobManager = createJobManager( - actorSystem, - new Configuration()); + actorSystem, + actorSystem.dispatcher(), + new Configuration()); startResourceManager(config, jobManager.actor()); @@ -629,6 +630,7 @@ public class TaskManagerRegistrationTest extends TestLogger { return JobManager.startJobManagerActors( configuration, actorSystem, + actorSystem.dispatcher(), NONE_STRING, NONE_STRING, JobManager.class, http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index f9c9b63..4485b65 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -172,6 +172,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { val (jm: ActorRef, _) = JobManager.startJobManagerActors( new Configuration(), _system, + _system.dispatcher, None, None, classOf[JobManager], http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index e9bdb99..c6fd923 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.testingUtils -import java.util.concurrent.ExecutorService +import java.util.concurrent.{Executor, ExecutorService} import akka.actor.ActorRef import org.apache.flink.configuration.Configuration @@ -39,7 +39,7 @@ import scala.language.postfixOps */ class TestingJobManager( flinkConfiguration: Configuration, - executorService: ExecutorService, + executor: Executor, instanceManager: InstanceManager, scheduler: Scheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -53,7 +53,7 @@ class TestingJobManager( metricRegistry : Option[MetricRegistry]) extends JobManager( flinkConfiguration, - executorService, + executor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 73fb928..b57a9dc 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -303,15 +303,18 @@ object TestingUtils { /** Creates a testing JobManager using the default recovery mode (standalone) * * @param actorSystem The ActorSystem to use + * @param executor to run the JobManager's futures * @param configuration The Flink configuration * @return */ def createJobManager( actorSystem: ActorSystem, + executor: Executor, configuration: Configuration) : ActorGateway = { createJobManager( actorSystem, + executor, configuration, classOf[TestingJobManager], "" @@ -322,85 +325,43 @@ object TestingUtils { * Additional prefix can be supplied for the Actor system names * * @param actorSystem The ActorSystem to use + * @param executor to run the JobManager's futures * @param configuration The Flink configuration * @param prefix The prefix for the actor names * @return */ def createJobManager( actorSystem: ActorSystem, + executor: Executor, configuration: Configuration, prefix: String) : ActorGateway = { createJobManager( actorSystem, + executor, configuration, classOf[TestingJobManager], prefix ) } - def createJobManager( - actorSystem: ActorSystem, - configuration: Configuration, - executionContext: ExecutionContext) - : ActorGateway = { - - val (_, - instanceManager, - scheduler, - libraryCacheManager, - restartStrategy, - timeout, - archiveCount, - leaderElectionService, - submittedJobGraphs, - checkpointRecoveryFactory, - jobRecoveryTimeout, - metricsRegistry) = JobManager.createJobManagerComponents( - configuration, - None - ) - - val archiveProps = Props(classOf[TestingMemoryArchivist], archiveCount) - - val archive: ActorRef = actorSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME) - - val jobManagerProps = Props( - classOf[TestingJobManager], - configuration, - executionContext, - instanceManager, - scheduler, - libraryCacheManager, - archive, - restartStrategy, - timeout, - leaderElectionService, - submittedJobGraphs, - checkpointRecoveryFactory, - jobRecoveryTimeout, - metricsRegistry) - - val jobManager: ActorRef = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME) - - new AkkaActorGateway(jobManager, null) - } - /** * Creates a JobManager of the given class using the default recovery mode (standalone) * * @param actorSystem ActorSystem to use + * @param executor to run the JobManager's futures * @param configuration Configuration to use * @param jobManagerClass JobManager class to instantiate * @return */ def createJobManager( actorSystem: ActorSystem, + executor: Executor, configuration: Configuration, jobManagerClass: Class[_ <: JobManager]) : ActorGateway = { - createJobManager(actorSystem, configuration, jobManagerClass, "") + createJobManager(actorSystem, executor, configuration, jobManagerClass, "") } /** @@ -408,6 +369,7 @@ object TestingUtils { * Additional prefix for the Actor names can be added. * * @param actorSystem ActorSystem to use + * @param executor to run the JobManager's futures * @param configuration Configuration to use * @param jobManagerClass JobManager class to instantiate * @param prefix The prefix to use for the Actor names @@ -415,6 +377,7 @@ object TestingUtils { */ def createJobManager( actorSystem: ActorSystem, + executor: Executor, configuration: Configuration, jobManagerClass: Class[_ <: JobManager], prefix: String) @@ -427,6 +390,7 @@ object TestingUtils { val (actor, _) = JobManager.startJobManagerActors( configuration, actorSystem, + executor, Some(prefix + JobManager.JOB_MANAGER_NAME), Some(prefix + JobManager.ARCHIVE_NAME), jobManagerClass, http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index b6eb7ba..af86983 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -129,6 +129,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test ActorRef jmActor = JobManager.startJobManagerActors( jmConfig, jmActorSystem, + jmActorSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index b66fb5d..f72ef34 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -104,6 +104,7 @@ public class ProcessFailureCancelingITCase { ActorRef jmActor = JobManager.startJobManagerActors( jmConfig, jmActorSystem, + jmActorSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index aef2604..aabc19d 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -18,12 +18,11 @@ package org.apache.flink.yarn -import java.util.concurrent.ExecutorService +import java.util.concurrent.Executor import akka.actor.ActorRef import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.instance.InstanceManager @@ -41,7 +40,7 @@ import scala.concurrent.duration.FiniteDuration * instead of an anonymous class with the respective mixin to obtain a more readable logger name. * * @param flinkConfiguration Configuration object for the actor - * @param executorService Execution context which is used to execute concurrent tasks in the + * @param executor Execution context which is used to execute concurrent tasks in the * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] @@ -54,7 +53,7 @@ import scala.concurrent.duration.FiniteDuration */ class TestingYarnJobManager( flinkConfiguration: Configuration, - executorService: ExecutorService, + executor: Executor, instanceManager: InstanceManager, scheduler: Scheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -68,7 +67,7 @@ class TestingYarnJobManager( metricRegistry : Option[MetricRegistry]) extends YarnJobManager( flinkConfiguration, - executorService, + executor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 8e3418c..002e162 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -37,8 +37,10 @@ import org.apache.flink.runtime.process.ProcessReaper; import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.Hardware; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.NamedThreadFactory; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.runtime.webmonitor.WebMonitor; @@ -67,13 +69,15 @@ import java.util.Map; import java.util.HashMap; import java.util.UUID; import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; /** * This class is the executable entry point for the YARN application master. - * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * It starts actor system and the actors for {@link JobManager} * and {@link YarnFlinkResourceManager}. * * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container @@ -209,6 +213,12 @@ public class YarnApplicationMasterRunner { ActorSystem actorSystem = null; WebMonitor webMonitor = null; + int numberProcessors = Hardware.getNumberCPUCores(); + + final ExecutorService executor = Executors.newFixedThreadPool( + numberProcessors, + new NamedThreadFactory("yarn-jobmanager-future-", "-thread-")); + try { // ------- (1) load and parse / validate all configurations ------- @@ -321,7 +331,9 @@ public class YarnApplicationMasterRunner { // we start the JobManager with its standard name ActorRef jobManager = JobManager.startJobManagerActors( - config, actorSystem, + config, + actorSystem, + executor, new scala.Some<>(JobManager.JOB_MANAGER_NAME()), scala.Option.<String>empty(), getJobManagerClass(), @@ -414,6 +426,9 @@ public class YarnApplicationMasterRunner { LOG.error("Failed to stop the web frontend", t); } } + + executor.shutdownNow(); + return 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/ae4b274a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 2df78c2..a81e6cf 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -18,7 +18,7 @@ package org.apache.flink.yarn -import java.util.concurrent.{ExecutorService, TimeUnit} +import java.util.concurrent.{Executor, TimeUnit} import akka.actor.ActorRef import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration} @@ -40,7 +40,7 @@ import scala.language.postfixOps * to start/administer/stop the Yarn session. * * @param flinkConfiguration Configuration object for the actor - * @param executorService Execution context which is used to execute concurrent tasks in the + * @param executor Execution context which is used to execute concurrent tasks in the * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] @@ -53,7 +53,7 @@ import scala.language.postfixOps */ class YarnJobManager( flinkConfiguration: FlinkConfiguration, - executorService: ExecutorService, + executor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -67,7 +67,7 @@ class YarnJobManager( metricsRegistry: Option[MetricRegistry]) extends ContaineredJobManager( flinkConfiguration, - executorService, + executor, instanceManager, scheduler, libraryCacheManager,
