Repository: flink Updated Branches: refs/heads/release-1.1 68585d145 -> cf4b22127
[FLINK-5082] Pull ExecutorService lifecycle management out of the JobManager The provided ExecutorService will no longer be closed by the JobManager. Instead the lifecycle is managed outside of it where it was created. This will give a nicer behaviour, because it better seperates responsibilities. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fb71c5b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fb71c5b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fb71c5b Branch: refs/heads/release-1.1 Commit: 7fb71c5bf1aab85250bf29bd0ea0654079cea48f Parents: 68585d1 Author: Till Rohrmann <[email protected]> Authored: Wed Nov 16 18:33:54 2016 +0100 Committer: Till Rohrmann <[email protected]> Committed: Mon Nov 21 15:53:19 2016 +0100 ---------------------------------------------------------------------- .../BackPressureStatsTrackerITCase.java | 2 +- .../StackTraceSampleCoordinatorITCase.java | 2 +- .../webmonitor/WebRuntimeMonitorITCase.java | 1 + .../flink/runtime/util/NamedThreadFactory.java | 58 ++++++++++++++++ .../flink/runtime/jobmanager/JobManager.scala | 71 +++++++++++-------- .../runtime/minicluster/FlinkMiniCluster.scala | 25 ++++--- .../minicluster/LocalFlinkMiniCluster.scala | 1 + .../testingUtils/TestingJobManager.scala | 8 +-- .../runtime/jobmanager/JobManagerTest.java | 26 +++---- .../flink/runtime/jobmanager/JobSubmitTest.java | 7 +- .../resourcemanager/ClusterShutdownITCase.java | 4 +- .../resourcemanager/ResourceManagerITCase.java | 4 +- ...askManagerComponentsStartupShutdownTest.java | 1 + .../TaskManagerProcessReapingTestBase.java | 1 + .../TaskManagerRegistrationTest.java | 8 ++- .../jobmanager/JobManagerRegistrationTest.scala | 1 + .../runtime/testingUtils/TestingCluster.scala | 6 +- .../runtime/testingUtils/TestingUtils.scala | 72 +++++--------------- .../test/util/ForkableFlinkMiniCluster.scala | 1 + ...ctTaskManagerProcessFailureRecoveryTest.java | 1 + .../recovery/ProcessFailureCancelingITCase.java | 1 + .../flink/yarn/TestingYarnJobManager.scala | 8 +-- .../flink/yarn/YarnApplicationMasterRunner.java | 19 +++++- .../org/apache/flink/yarn/YarnJobManager.scala | 15 ++-- 24 files changed, 205 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java index 25dc189..9fbbd90 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java @@ -120,7 +120,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger { } try { - jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration()); + jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration()); Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java index 9b1f608..868dae1 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java @@ -90,7 +90,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger { ActorGateway taskManager = null; try { - jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration()); + jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration()); Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index 677ff54..a6b958a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -178,6 +178,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { jobManager[i] = JobManager.startJobManagerActors( jmConfig, jobManagerSystem[i], + jobManagerSystem[i].dispatcher(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java new file mode 100644 index 0000000..bd97963 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Thread factory which allows to specify a thread pool name and a thread name. + * + * The code is based on {@link java.util.concurrent.Executors.DefaultThreadFactory}. + */ +public class NamedThreadFactory implements ThreadFactory { + private static final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + public NamedThreadFactory(final String poolName, final String threadName) { + SecurityManager securityManager = System.getSecurityManager(); + group = (securityManager != null) ? securityManager.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + + namePrefix = poolName + + poolNumber.getAndIncrement() + + threadName; + } + + @Override + public Thread newThread(Runnable runnable) { + Thread t = new Thread(group, runnable, + namePrefix + threadNumber.getAndIncrement(), + 0); + if (t.isDaemon()) { + t.setDaemon(false); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 41218c9..c6e18e9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -22,7 +22,7 @@ import java.io.{File, IOException} import java.lang.management.ManagementFactory import java.net._ import java.util.UUID -import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException} +import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _} import javax.management.ObjectName import akka.actor.Status.Failure @@ -50,7 +50,7 @@ import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionGraphException, ExecutionJobVertex} -import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager} +import org.apache.flink.runtime.instance.{AkkaActorGateway, Hardware, InstanceManager} import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID} import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener @@ -114,7 +114,7 @@ import scala.language.postfixOps */ class JobManager( protected val flinkConfiguration: Configuration, - protected val executorService: ExecutorService, + protected val executor: Executor, protected val instanceManager: InstanceManager, protected val scheduler: FlinkScheduler, protected val libraryCacheManager: BlobLibraryCacheManager, @@ -137,7 +137,7 @@ class JobManager( /** The extra execution context, for futures, with a custom logging reporter */ protected val executionContext: ExecutionContext = ExecutionContext.fromExecutor( - executorService, + executor, (t: Throwable) => { if (!context.system.isTerminated) { log.error("Executor could not execute task", t) @@ -277,9 +277,6 @@ class JobManager( case e: IOException => log.error("Could not properly shutdown the library cache manager.", e) } - // shut down the extra thread pool for futures - executorService.shutdown() - // failsafe shutdown of the metrics registry try { metricsRegistry.map(_.shutdown()) @@ -2013,15 +2010,29 @@ object JobManager { listeningPort: Int) : Unit = { - val (jobManagerSystem, _, _, webMonitorOption, _) = startActorSystemAndJobManagerActors( - configuration, - executionMode, - listeningAddress, - listeningPort, - classOf[JobManager], - classOf[MemoryArchivist], - Option(classOf[StandaloneResourceManager]) - ) + val numberProcessors = Hardware.getNumberCPUCores() + + val executor = Executors.newFixedThreadPool( + numberProcessors, + new NamedThreadFactory("jobmanager-future-", "-thread-")) + + val (jobManagerSystem, _, _, webMonitorOption, _) = try { + startActorSystemAndJobManagerActors( + configuration, + executionMode, + listeningAddress, + listeningPort, + executor, + classOf[JobManager], + classOf[MemoryArchivist], + Option(classOf[StandaloneResourceManager]) + ) + } catch { + case t: Throwable => + executor.shutdownNow() + + throw t + } // block until everything is shut down jobManagerSystem.awaitTermination() @@ -2035,6 +2046,8 @@ object JobManager { LOG.warn("Could not properly stop the web monitor.", t) } } + + executor.shutdownNow() } /** @@ -2142,6 +2155,7 @@ object JobManager { * additional TaskManager in the same process. * @param listeningAddress The hostname where the JobManager should listen for messages. * @param listeningPort The port where the JobManager should listen for messages + * @param executor to run the JobManager's futures * @param jobManagerClass The class of the JobManager to be started * @param archiveClass The class of the Archivist to be started * @param resourceManagerClass Optional class of resource manager if one should be started @@ -2153,6 +2167,7 @@ object JobManager { executionMode: JobManagerMode, listeningAddress: String, listeningPort: Int, + executor: Executor, jobManagerClass: Class[_ <: JobManager], archiveClass: Class[_ <: MemoryArchivist], resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]]) @@ -2221,6 +2236,7 @@ object JobManager { val (jobManager, archive) = startJobManagerActors( configuration, jobManagerSystem, + executor, jobManagerClass, archiveClass) @@ -2424,15 +2440,16 @@ object JobManager { * delayBetweenRetries, timeout) * * @param configuration The configuration from which to parse the config values. + * @param executor to run JobManager's futures * @param leaderElectionServiceOption LeaderElectionService which shall be returned if the option * is defined * @return The members for a default JobManager. */ def createJobManagerComponents( configuration: Configuration, + executor: Executor, leaderElectionServiceOption: Option[LeaderElectionService]) : - (ExecutorService, - InstanceManager, + (InstanceManager, FlinkScheduler, BlobLibraryCacheManager, RestartStrategyFactory, @@ -2462,13 +2479,11 @@ object JobManager { var instanceManager: InstanceManager = null var scheduler: FlinkScheduler = null var libraryCacheManager: BlobLibraryCacheManager = null - - val executorService: ExecutorService = new ForkJoinPool() try { blobServer = new BlobServer(configuration) instanceManager = new InstanceManager() - scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executorService)) + scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executor)) libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval) instanceManager.addInstanceListener(scheduler) @@ -2487,7 +2502,6 @@ object JobManager { if (blobServer != null) { blobServer.shutdown() } - executorService.shutdownNow() throw t } @@ -2542,8 +2556,7 @@ object JobManager { None } - (executorService, - instanceManager, + (instanceManager, scheduler, libraryCacheManager, restartStrategy, @@ -2570,6 +2583,7 @@ object JobManager { def startJobManagerActors( configuration: Configuration, actorSystem: ActorSystem, + executor: Executor, jobManagerClass: Class[_ <: JobManager], archiveClass: Class[_ <: MemoryArchivist]) : (ActorRef, ActorRef) = { @@ -2577,6 +2591,7 @@ object JobManager { startJobManagerActors( configuration, actorSystem, + executor, Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME), jobManagerClass, @@ -2589,6 +2604,7 @@ object JobManager { * * @param configuration The configuration for the JobManager * @param actorSystem The actor system running the JobManager + * @param executor to run JobManager's futures * @param jobManagerActorName Optionally the name of the JobManager actor. If none is given, * the actor will have the name generated by the actor system. * @param archiveActorName Optionally the name of the archive actor. If none is given, @@ -2600,14 +2616,14 @@ object JobManager { def startJobManagerActors( configuration: Configuration, actorSystem: ActorSystem, + executor: Executor, jobManagerActorName: Option[String], archiveActorName: Option[String], jobManagerClass: Class[_ <: JobManager], archiveClass: Class[_ <: MemoryArchivist]) : (ActorRef, ActorRef) = { - val (executorService: ExecutorService, - instanceManager, + val (instanceManager, scheduler, libraryCacheManager, restartStrategy, @@ -2620,6 +2636,7 @@ object JobManager { jobRecoveryTimeout, metricsRegistry) = createJobManagerComponents( configuration, + executor, None) val archiveProps = Props(archiveClass, archiveCount) @@ -2633,7 +2650,7 @@ object JobManager { val jobManagerProps = Props( jobManagerClass, configuration, - executorService, + executor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 5074b8c..271535e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -20,26 +20,23 @@ package org.apache.flink.runtime.minicluster import java.net.InetAddress import java.util.UUID +import java.util.concurrent.Executors import akka.pattern.Patterns.gracefulStop import akka.pattern.ask import akka.actor.{ActorRef, ActorSystem} - import com.typesafe.config.Config - -import org.apache.flink.api.common.{JobID, JobExecutionResult, JobSubmissionResult} +import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult} import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.client.{JobExecutionException, JobClient} -import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway} +import org.apache.flink.runtime.client.{JobClient, JobExecutionException} +import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway, Hardware} import org.apache.flink.runtime.jobgraph.JobGraph import org.apache.flink.runtime.jobmanager.RecoveryMode -import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalService, LeaderRetrievalListener, -StandaloneLeaderRetrievalService} +import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService} import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager -import org.apache.flink.runtime.util.ZooKeeperUtils -import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor} - +import org.apache.flink.runtime.util.{NamedThreadFactory, ZooKeeperUtils} +import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.slf4j.LoggerFactory import scala.concurrent.duration.{Duration, FiniteDuration} @@ -109,6 +106,12 @@ abstract class FlinkMiniCluster( private var isRunning = false + val executor = Executors.newFixedThreadPool( + Hardware.getNumberCPUCores(), + new NamedThreadFactory("mini-cluster-future-", "-thread-")) + + + // -------------------------------------------------------------------------- // Abstract Methods // -------------------------------------------------------------------------- @@ -370,6 +373,8 @@ abstract class FlinkMiniCluster( jobManagerLeaderRetrievalService.foreach(_.stop()) isRunning = false + + executor.shutdownNow } protected def shutdown(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 5bebd48..594997c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -82,6 +82,7 @@ class LocalFlinkMiniCluster( val (jobManager, _) = JobManager.startJobManagerActors( config, system, + executor, Some(jobManagerName), Some(archiveName), classOf[JobManager], http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index 16331ac..e9db9b0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -19,7 +19,6 @@ package org.apache.flink.runtime.testingUtils import akka.actor.ActorRef - import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore @@ -33,15 +32,14 @@ import org.apache.flink.runtime.metrics.MetricRegistry import scala.concurrent.duration._ import scala.language.postfixOps - -import java.util.concurrent.ExecutorService +import java.util.concurrent.Executor /** JobManager implementation extended by testing messages * */ class TestingJobManager( flinkConfiguration: Configuration, - executorService: ExecutorService, + executor: Executor, instanceManager: InstanceManager, scheduler: Scheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -56,7 +54,7 @@ class TestingJobManager( metricRegistry : Option[MetricRegistry]) extends JobManager( flinkConfiguration, - executorService, + executor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index d60e060..148e88f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -388,12 +388,13 @@ public class JobManagerTest extends TestLogger { actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors( - config, - actorSystem, - Option.apply("jm"), - Option.apply("arch"), - TestingJobManager.class, - TestingMemoryArchivist.class); + config, + actorSystem, + actorSystem.dispatcher(), + Option.apply("jm"), + Option.apply("arch"), + TestingJobManager.class, + TestingMemoryArchivist.class); jobManager = new AkkaActorGateway(master._1(), null); archiver = new AkkaActorGateway(master._2(), null); @@ -481,12 +482,13 @@ public class JobManagerTest extends TestLogger { actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors( - new Configuration(), - actorSystem, - Option.apply("jm"), - Option.apply("arch"), - TestingJobManager.class, - TestingMemoryArchivist.class); + new Configuration(), + actorSystem, + actorSystem.dispatcher(), + Option.apply("jm"), + Option.apply("arch"), + TestingJobManager.class, + TestingMemoryArchivist.class); jobManager = new AkkaActorGateway(master._1(), null); archiver = new AkkaActorGateway(master._2(), null); http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index 536b729..42ed25b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -79,9 +79,10 @@ public class JobSubmitTest { // only start JobManager (no ResourceManager) JobManager.startJobManagerActors( config, - jobManagerSystem, - JobManager.class, - MemoryArchivist.class)._1(); + jobManagerSystem, + jobManagerSystem.dispatcher(), + JobManager.class, + MemoryArchivist.class)._1(); try { LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config); http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java index 32c6cac..8530ce6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java @@ -72,7 +72,7 @@ public class ClusterShutdownITCase extends TestLogger { // start job manager which doesn't shutdown the actor system ActorGateway jobManager = - TestingUtils.createJobManager(system, config, "jobmanager1"); + TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager1"); // Tell the JobManager to inform us of shutdown actions jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); @@ -114,7 +114,7 @@ public class ClusterShutdownITCase extends TestLogger { // start job manager which doesn't shutdown the actor system ActorGateway jobManager = - TestingUtils.createJobManager(system, config, "jobmanager2"); + TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager2"); // Tell the JobManager to inform us of shutdown actions jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java index ca09634..bfc6abe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java @@ -72,7 +72,7 @@ public class ResourceManagerITCase extends TestLogger { protected void run() { ActorGateway jobManager = - TestingUtils.createJobManager(system, config, "ReconciliationTest"); + TestingUtils.createJobManager(system, system.dispatcher(), config, "ReconciliationTest"); ActorGateway me = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); @@ -125,7 +125,7 @@ public class ResourceManagerITCase extends TestLogger { protected void run() { ActorGateway jobManager = - TestingUtils.createJobManager(system, config, "RegTest"); + TestingUtils.createJobManager(system, system.dispatcher(), config, "RegTest"); ActorGateway me = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index b4c456c..46bc7a5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -80,6 +80,7 @@ public class TaskManagerComponentsStartupShutdownTest { final ActorRef jobManager = JobManager.startJobManagerActors( config, actorSystem, + actorSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java index c7913f7..63c1b29 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java @@ -102,6 +102,7 @@ public abstract class TaskManagerProcessReapingTestBase { ActorRef jmActor = JobManager.startJobManagerActors( new Configuration(), jmActorSystem, + jmActorSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1; http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index e23aba7..52d500d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -112,7 +112,7 @@ public class TaskManagerRegistrationTest extends TestLogger { try { // a simple JobManager - jobManager = createJobManager(actorSystem, config); + jobManager = createJobManager(actorSystem, actorSystem.dispatcher(), config); startResourceManager(config, jobManager.actor()); // start two TaskManagers. it will automatically try to register @@ -193,8 +193,9 @@ public class TaskManagerRegistrationTest extends TestLogger { // now start the JobManager, with the regular akka URL jobManager = createJobManager( - actorSystem, - new Configuration()); + actorSystem, + actorSystem.dispatcher(), + new Configuration()); startResourceManager(config, jobManager.actor()); @@ -698,6 +699,7 @@ public class TaskManagerRegistrationTest extends TestLogger { return JobManager.startJobManagerActors( configuration, actorSystem, + actorSystem.dispatcher(), NONE_STRING, NONE_STRING, JobManager.class, http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index 7feb949..b35cdb4 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -168,6 +168,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { val (jm: ActorRef, _) = JobManager.startJobManagerActors( new Configuration(), _system, + _system.dispatcher, None, None, classOf[JobManager], http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index b4ba40b..c3f846e 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -96,8 +96,7 @@ class TestingCluster( config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index) } - val (executionContext, - instanceManager, + val (instanceManager, scheduler, libraryCacheManager, restartStrategyFactory, @@ -110,6 +109,7 @@ class TestingCluster( jobRecoveryTimeout, metricRegistry) = JobManager.createJobManagerComponents( config, + executor, createLeaderElectionService()) val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount)) @@ -118,7 +118,7 @@ class TestingCluster( val jobManagerProps = Props( new TestingJobManager( configuration, - executionContext, + executor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 02a0fec..576993d 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -19,26 +19,25 @@ package org.apache.flink.runtime.testingUtils import java.util.UUID +import java.util.concurrent.Executor -import akka.actor.{Props, Kill, ActorSystem, ActorRef} +import akka.actor.{ActorRef, ActorSystem, Kill, Props} import akka.pattern.ask import com.google.common.util.concurrent.MoreExecutors - import com.typesafe.config.ConfigFactory import grizzled.slf4j.Logger import org.apache.flink.api.common.JobExecutionResult - import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.client.JobClient import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.jobgraph.JobGraph import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager} +import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist} import org.apache.flink.runtime.testutils.TestingResourceManager import org.apache.flink.runtime.util.LeaderRetrievalUtils -import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, FlinkActor} +import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway} +import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager import org.apache.flink.runtime.taskmanager.TaskManager @@ -304,15 +303,18 @@ object TestingUtils { /** Creates a testing JobManager using the default recovery mode (standalone) * * @param actorSystem The ActorSystem to use + * @param executor to run the JobManager's futures * @param configuration The Flink configuration * @return */ def createJobManager( actorSystem: ActorSystem, + executor: Executor, configuration: Configuration) : ActorGateway = { createJobManager( actorSystem, + executor, configuration, classOf[TestingJobManager], "" @@ -323,86 +325,43 @@ object TestingUtils { * Additional prefix can be supplied for the Actor system names * * @param actorSystem The ActorSystem to use + * @param executor to run the JobManager's futures * @param configuration The Flink configuration * @param prefix The prefix for the actor names * @return */ def createJobManager( actorSystem: ActorSystem, + executor: Executor, configuration: Configuration, prefix: String) : ActorGateway = { createJobManager( actorSystem, + executor, configuration, classOf[TestingJobManager], prefix ) } - def createJobManager( - actorSystem: ActorSystem, - configuration: Configuration, - executionContext: ExecutionContext) - : ActorGateway = { - - val (_, - instanceManager, - scheduler, - libraryCacheManager, - restartStrategy, - timeout, - archiveCount, - leaderElectionService, - submittedJobGraphs, - checkpointRecoveryFactory, - savepointStore, - jobRecoveryTimeout, - metricsRegistry) = JobManager.createJobManagerComponents( - configuration, - None - ) - - val archiveProps = Props(classOf[TestingMemoryArchivist], archiveCount) - - val archive: ActorRef = actorSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME) - - val jobManagerProps = Props( - classOf[TestingJobManager], - configuration, - executionContext, - instanceManager, - scheduler, - libraryCacheManager, - archive, - restartStrategy, - timeout, - leaderElectionService, - submittedJobGraphs, - checkpointRecoveryFactory, - jobRecoveryTimeout, - metricsRegistry) - - val jobManager: ActorRef = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME) - - new AkkaActorGateway(jobManager, null) - } - /** * Creates a JobManager of the given class using the default recovery mode (standalone) * * @param actorSystem ActorSystem to use + * @param executor to run the JobManager's futures * @param configuration Configuration to use * @param jobManagerClass JobManager class to instantiate * @return */ def createJobManager( actorSystem: ActorSystem, + executor: Executor, configuration: Configuration, jobManagerClass: Class[_ <: JobManager]) : ActorGateway = { - createJobManager(actorSystem, configuration, jobManagerClass, "") + createJobManager(actorSystem, executor, configuration, jobManagerClass, "") } /** @@ -410,6 +369,7 @@ object TestingUtils { * Additional prefix for the Actor names can be added. * * @param actorSystem ActorSystem to use + * @param executor to run the JobManager's futures * @param configuration Configuration to use * @param jobManagerClass JobManager class to instantiate * @param prefix The prefix to use for the Actor names @@ -418,6 +378,7 @@ object TestingUtils { */ def createJobManager( actorSystem: ActorSystem, + executor: Executor, configuration: Configuration, jobManagerClass: Class[_ <: JobManager], prefix: String) @@ -428,6 +389,7 @@ object TestingUtils { val (actor, _) = JobManager.startJobManagerActors( configuration, actorSystem, + executor, Some(prefix + JobManager.JOB_MANAGER_NAME), Some(prefix + JobManager.ARCHIVE_NAME), jobManagerClass, http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala index 79c5a25..f2a4c5c 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -103,6 +103,7 @@ class ForkableFlinkMiniCluster( val (jobManager, _) = JobManager.startJobManagerActors( config, actorSystem, + executor, Some(jobManagerName), Some(archiveName), classOf[TestingJobManager], http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index b6eb7ba..af86983 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -129,6 +129,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test ActorRef jmActor = JobManager.startJobManagerActors( jmConfig, jmActorSystem, + jmActorSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index b66fb5d..f72ef34 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -104,6 +104,7 @@ public class ProcessFailureCancelingITCase { ActorRef jmActor = JobManager.startJobManagerActors( jmConfig, jmActorSystem, + jmActorSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index 7ca9c3e..0ed0d83 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -18,7 +18,7 @@ package org.apache.flink.yarn -import java.util.concurrent.ExecutorService +import java.util.concurrent.Executor import akka.actor.ActorRef import org.apache.flink.configuration.Configuration @@ -41,7 +41,7 @@ import scala.concurrent.duration.FiniteDuration * instead of an anonymous class with the respective mixin to obtain a more readable logger name. * * @param flinkConfiguration Configuration object for the actor - * @param executorService Execution context which is used to execute concurrent tasks in the + * @param executor Execution context which is used to execute concurrent tasks in the * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] @@ -54,7 +54,7 @@ import scala.concurrent.duration.FiniteDuration */ class TestingYarnJobManager( flinkConfiguration: Configuration, - executorService: ExecutorService, + executor: Executor, instanceManager: InstanceManager, scheduler: Scheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -69,7 +69,7 @@ class TestingYarnJobManager( metricRegistry : Option[MetricRegistry]) extends YarnJobManager( flinkConfiguration, - executorService, + executor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index d19ddde..eb00992 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.instance.Hardware; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -35,6 +36,7 @@ import org.apache.flink.runtime.process.ProcessReaper; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.NamedThreadFactory; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.runtime.webmonitor.WebMonitor; @@ -64,13 +66,15 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; /** * This class is the executable entry point for the YARN application master. - * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * It starts actor system and the actors for {@link JobManager} * and {@link YarnFlinkResourceManager}. * * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container @@ -174,6 +178,12 @@ public class YarnApplicationMasterRunner { ActorSystem actorSystem = null; WebMonitor webMonitor = null; + int numberProcessors = Hardware.getNumberCPUCores(); + + final ExecutorService executor = Executors.newFixedThreadPool( + numberProcessors, + new NamedThreadFactory("yarn-jobmanager-future-", "-thread-")); + try { // ------- (1) load and parse / validate all configurations ------- @@ -277,7 +287,9 @@ public class YarnApplicationMasterRunner { // we start the JobManager with its standard name ActorRef jobManager = JobManager.startJobManagerActors( - config, actorSystem, + config, + actorSystem, + executor, new scala.Some<>(JobManager.JOB_MANAGER_NAME()), scala.Option.<String>empty(), getJobManagerClass(), @@ -364,6 +376,9 @@ public class YarnApplicationMasterRunner { LOG.error("Failed to stop the web frontend", t); } } + + executor.shutdownNow(); + return 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/7fb71c5b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 94ad9f2..d7df66a 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -18,21 +18,20 @@ package org.apache.flink.yarn -import java.util.concurrent.{TimeUnit, ExecutorService} +import java.util.concurrent.{Executor, TimeUnit} import akka.actor.ActorRef - import org.apache.flink.api.common.JobID -import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} +import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration} import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore import org.apache.flink.runtime.clusterframework.ApplicationStatus import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.jobgraph.JobStatus -import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} +import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound} +import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus} import org.apache.flink.runtime.messages.Messages.Acknowledge import org.apache.flink.runtime.metrics.MetricRegistry import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager @@ -47,7 +46,7 @@ import scala.language.postfixOps * to start/administer/stop the Yarn session. * * @param flinkConfiguration Configuration object for the actor - * @param executorService Execution context which is used to execute concurrent tasks in the + * @param executor Execution context which is used to execute concurrent tasks in the * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] @@ -60,7 +59,7 @@ import scala.language.postfixOps */ class YarnJobManager( flinkConfiguration: FlinkConfiguration, - executorService: ExecutorService, + executor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -75,7 +74,7 @@ class YarnJobManager( metricsRegistry: Option[MetricRegistry]) extends JobManager( flinkConfiguration, - executorService, + executor, instanceManager, scheduler, libraryCacheManager,
