Repository: flink Updated Branches: refs/heads/master 698e53e47 -> c590912c9
[FLINK-5073] Use Executor to run ZooKeeper callbacks in ZooKeeperStateHandleStore Use dedicated Executor to run ZooKeeper callbacks in ZooKeeperStateHandleStore instead of running it in the ZooKeeper client's thread. The callback can be blocking because it discards state which might entail deleting files from disk. Introduce dedicated Executor for blocking io operations This closes #2815. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fb92d86 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fb92d86 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fb92d86 Branch: refs/heads/master Commit: 3fb92d8687f03c1fac8b87396b2b5a7ff29f6dd6 Parents: ae4b274 Author: Till Rohrmann <[email protected]> Authored: Tue Nov 15 22:45:04 2016 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Nov 22 23:00:16 2016 +0100 ---------------------------------------------------------------------- .../MesosApplicationMasterRunner.java | 12 +++- .../BackPressureStatsTrackerITCase.java | 6 +- .../StackTraceSampleCoordinatorITCase.java | 6 +- .../webmonitor/WebRuntimeMonitorITCase.java | 1 + .../ZooKeeperCheckpointRecoveryFactory.java | 12 +++- .../ZooKeeperCompletedCheckpointStore.java | 7 ++- .../ZooKeeperSubmittedJobGraphStore.java | 7 ++- .../flink/runtime/util/ZooKeeperUtils.java | 20 ++++--- .../RetrievableStateStorageHelper.java | 2 +- .../zookeeper/ZooKeeperStateHandleStore.java | 13 ++++- .../flink/runtime/jobmanager/JobManager.scala | 59 +++++++++++++------- .../runtime/minicluster/FlinkMiniCluster.scala | 9 ++- .../minicluster/LocalFlinkMiniCluster.scala | 5 +- ...ZooKeeperCompletedCheckpointStoreITCase.java | 3 +- .../runtime/jobmanager/JobManagerTest.java | 4 ++ .../flink/runtime/jobmanager/JobSubmitTest.java | 1 + .../ZooKeeperSubmittedJobGraphsStoreITCase.java | 14 +++-- .../resourcemanager/ClusterShutdownITCase.java | 14 ++++- .../resourcemanager/ResourceManagerITCase.java | 14 ++++- ...askManagerComponentsStartupShutdownTest.java | 1 + .../TaskManagerProcessReapingTestBase.java | 1 + .../TaskManagerRegistrationTest.java | 8 ++- .../ZooKeeperStateHandleStoreITCase.java | 31 +++++----- .../jobmanager/JobManagerRegistrationTest.scala | 1 + .../runtime/testingUtils/TestingUtils.scala | 35 ++++++++---- ...ctTaskManagerProcessFailureRecoveryTest.java | 1 + .../recovery/ProcessFailureCancelingITCase.java | 1 + .../flink/yarn/YarnApplicationMasterRunner.java | 12 +++- 28 files changed, 212 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 09f87bd..166218f 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -174,10 +174,14 @@ public class MesosApplicationMasterRunner { int numberProcessors = Hardware.getNumberCPUCores(); - final ExecutorService executor = Executors.newFixedThreadPool( + final ExecutorService futureExecutor = Executors.newFixedThreadPool( numberProcessors, new NamedThreadFactory("mesos-jobmanager-future-", "-thread-")); + final ExecutorService ioExecutor = Executors.newFixedThreadPool( + numberProcessors, + new NamedThreadFactory("mesos-jobmanager-io-", "-thread-")); + try { // ------- (1) load and parse / validate all configurations ------- @@ -293,7 +297,8 @@ public class MesosApplicationMasterRunner { ActorRef jobManager = JobManager.startJobManagerActors( config, actorSystem, - executor, + futureExecutor, + ioExecutor, new scala.Some<>(JobManager.JOB_MANAGER_NAME()), scala.Option.<String>empty(), getJobManagerClass(), @@ -399,7 +404,8 @@ public class MesosApplicationMasterRunner { LOG.error("Failed to stop the artifact server", t); } - executor.shutdownNow(); + futureExecutor.shutdownNow(); + ioExecutor.shutdownNow(); return 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java index d01e0cf..357301a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java @@ -120,7 +120,11 @@ public class BackPressureStatsTrackerITCase extends TestLogger { } try { - jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration()); + jobManger = TestingUtils.createJobManager( + testActorSystem, + testActorSystem.dispatcher(), + testActorSystem.dispatcher(), + new Configuration()); final Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java index 4b5bd2f..f31f41f 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java @@ -90,7 +90,11 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger { ActorGateway taskManager = null; try { - jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration()); + jobManger = TestingUtils.createJobManager( + testActorSystem, + testActorSystem.dispatcher(), + testActorSystem.dispatcher(), + new Configuration()); final Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index fcdf94d..853ef14 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -180,6 +180,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { jmConfig, jobManagerSystem[i], jobManagerSystem[i].dispatcher(), + jobManagerSystem[i].dispatcher(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java index f47012d..09bfa8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java @@ -24,6 +24,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.util.ZooKeeperUtils; +import java.util.concurrent.Executor; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -35,9 +37,15 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac private final Configuration config; - public ZooKeeperCheckpointRecoveryFactory(CuratorFramework client, Configuration config) { + private final Executor executor; + + public ZooKeeperCheckpointRecoveryFactory( + CuratorFramework client, + Configuration config, + Executor executor) { this.client = checkNotNull(client, "Curator client"); this.config = checkNotNull(config, "Configuration"); + this.executor = checkNotNull(executor, "Executor"); } @Override @@ -55,7 +63,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac throws Exception { return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId, - NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN); + NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, executor); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 4f67921..4add504 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -92,13 +93,15 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto * start with a '/') * @param stateStorage State storage to be used to persist the completed * checkpoint + * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks * @throws Exception */ public ZooKeeperCompletedCheckpointStore( int maxNumberOfCheckpointsToRetain, CuratorFramework client, String checkpointsPath, - RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception { + RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage, + Executor executor) throws Exception { checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); checkNotNull(stateStorage, "State storage"); @@ -115,7 +118,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto // All operations will have the path as root this.client = client.usingNamespace(client.getNamespace() + checkpointsPath); - this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage); + this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor); this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java index ec05f1e..b241712 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -39,6 +39,7 @@ import java.util.ConcurrentModificationException; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -93,12 +94,14 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { * @param client ZooKeeper client * @param currentJobsPath ZooKeeper path for current job graphs * @param stateStorage State storage used to persist the submitted jobs + * @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks * @throws Exception */ public ZooKeeperSubmittedJobGraphStore( CuratorFramework client, String currentJobsPath, - RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception { + RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage, + Executor executor) throws Exception { checkNotNull(currentJobsPath, "Current jobs path"); checkNotNull(stateStorage, "State storage"); @@ -114,7 +117,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { // All operations will have the path as root CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath); - this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage); + this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor); this.pathCache = new PathChildrenCache(facade, "/", false); pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener()); http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index a9887a6..70ac6c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.util.List; +import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -216,12 +217,14 @@ public class ZooKeeperUtils { * * @param client The {@link CuratorFramework} ZooKeeper client to use * @param configuration {@link Configuration} object + * @param executor to run ZooKeeper callbacks * @return {@link ZooKeeperSubmittedJobGraphStore} instance * @throws Exception if the submitted job graph store cannot be created */ public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs( CuratorFramework client, - Configuration configuration) throws Exception { + Configuration configuration, + Executor executor) throws Exception { checkNotNull(configuration, "Configuration"); @@ -235,7 +238,7 @@ public class ZooKeeperUtils { ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH); return new ZooKeeperSubmittedJobGraphStore( - client, zooKeeperSubmittedJobsPath, stateStorage); + client, zooKeeperSubmittedJobsPath, stateStorage, executor); } /** @@ -245,6 +248,7 @@ public class ZooKeeperUtils { * @param configuration {@link Configuration} object * @param jobId ID of job to create the instance for * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain + * @param executor to run ZooKeeper callbacks * @return {@link ZooKeeperCompletedCheckpointStore} instance * @throws Exception if the completed checkpoint store cannot be created */ @@ -252,7 +256,8 @@ public class ZooKeeperUtils { CuratorFramework client, Configuration configuration, JobID jobId, - int maxNumberOfCheckpointsToRetain) throws Exception { + int maxNumberOfCheckpointsToRetain, + Executor executor) throws Exception { checkNotNull(configuration, "Configuration"); @@ -269,10 +274,11 @@ public class ZooKeeperUtils { checkpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId); return new ZooKeeperCompletedCheckpointStore( - maxNumberOfCheckpointsToRetain, - client, - checkpointsPath, - stateStorage); + maxNumberOfCheckpointsToRetain, + client, + checkpointsPath, + stateStorage, + executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java index 1434f74..f6acea3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.state.RetrievableStateHandle; import java.io.Serializable; /** - * State storage helper which is used by {@link ZooKeeperStateHandleStore} to persiste state before + * State storage helper which is used by {@link ZooKeeperStateHandleStore} to persist state before * the state handle is written to ZooKeeper. * * @param <T> The type of the data that can be stored by this storage helper. http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java index 5623715..14d9f6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java @@ -30,10 +30,10 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -75,6 +75,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { private final RetrievableStateStorageHelper<T> storage; + private final Executor executor; + /** * Creates a {@link ZooKeeperStateHandleStore}. * @@ -82,13 +84,18 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { * expected that the client's namespace ensures that the root * path is exclusive for all state handles managed by this * instance, e.g. <code>client.usingNamespace("/stateHandles")</code> + * @param storage to persist the actual state and whose returned state handle is then written + * to ZooKeeper + * @param executor to run the ZooKeeper callbacks */ public ZooKeeperStateHandleStore( CuratorFramework client, - RetrievableStateStorageHelper<T> storage) throws IOException { + RetrievableStateStorageHelper<T> storage, + Executor executor) { this.client = checkNotNull(client, "Curator client"); this.storage = checkNotNull(storage, "State storage"); + this.executor = checkNotNull(executor); } /** @@ -348,7 +355,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(callback, "Background callback"); - client.delete().deletingChildrenIfNeeded().inBackground(callback).forPath(pathInZooKeeper); + client.delete().deletingChildrenIfNeeded().inBackground(callback, executor).forPath(pathInZooKeeper); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index df80d72..08ed0a4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1970,24 +1970,30 @@ object JobManager { val numberProcessors = Hardware.getNumberCPUCores() - val executor = Executors.newFixedThreadPool( + val futureExecutor = Executors.newFixedThreadPool( numberProcessors, new NamedThreadFactory("jobmanager-future-", "-thread-")) + val ioExecutor = Executors.newFixedThreadPool( + numberProcessors, + new NamedThreadFactory("jobmanager-io-", "-thread-") + ) + val (jobManagerSystem, _, _, webMonitorOption, _) = try { startActorSystemAndJobManagerActors( configuration, executionMode, listeningAddress, listeningPort, - executor, + futureExecutor, + ioExecutor, classOf[JobManager], classOf[MemoryArchivist], Option(classOf[StandaloneResourceManager]) ) } catch { case t: Throwable => - executor.shutdownNow() + futureExecutor.shutdownNow() throw t } @@ -2005,7 +2011,8 @@ object JobManager { } } - executor.shutdownNow() + futureExecutor.shutdownNow() + ioExecutor.shutdownNow() } /** @@ -2113,7 +2120,8 @@ object JobManager { * additional TaskManager in the same process. * @param listeningAddress The hostname where the JobManager should listen for messages. * @param listeningPort The port where the JobManager should listen for messages - * @param executor to run the JobManager's futures + * @param futureExecutor to run the JobManager's futures + * @param ioExecutor to run blocking io operations * @param jobManagerClass The class of the JobManager to be started * @param archiveClass The class of the Archivist to be started * @param resourceManagerClass Optional class of resource manager if one should be started @@ -2125,7 +2133,8 @@ object JobManager { executionMode: JobManagerMode, listeningAddress: String, listeningPort: Int, - executor: Executor, + futureExecutor: Executor, + ioExecutor: Executor, jobManagerClass: Class[_ <: JobManager], archiveClass: Class[_ <: MemoryArchivist], resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]]) @@ -2194,7 +2203,8 @@ object JobManager { val (jobManager, archive) = startJobManagerActors( configuration, jobManagerSystem, - executor, + futureExecutor, + ioExecutor, jobManagerClass, archiveClass) @@ -2395,14 +2405,16 @@ object JobManager { * delayBetweenRetries, timeout) * * @param configuration The configuration from which to parse the config values. - * @param executor to run JobManager's futures + * @param futureExecutor to run JobManager's futures + * @param ioExecutor to run blocking io operations * @param leaderElectionServiceOption LeaderElectionService which shall be returned if the option * is defined * @return The members for a default JobManager. */ def createJobManagerComponents( configuration: Configuration, - executor: Executor, + futureExecutor: Executor, + ioExecutor: Executor, leaderElectionServiceOption: Option[LeaderElectionService]) : (InstanceManager, FlinkScheduler, @@ -2433,11 +2445,11 @@ object JobManager { var instanceManager: InstanceManager = null var scheduler: FlinkScheduler = null var libraryCacheManager: BlobLibraryCacheManager = null - + try { blobServer = new BlobServer(configuration) instanceManager = new InstanceManager() - scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executor)) + scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor)) libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval) instanceManager.addInstanceListener(scheduler) @@ -2482,8 +2494,8 @@ object JobManager { } (leaderElectionService, - ZooKeeperUtils.createSubmittedJobGraphs(client, configuration), - new ZooKeeperCheckpointRecoveryFactory(client, configuration)) + ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, ioExecutor), + new ZooKeeperCheckpointRecoveryFactory(client, configuration, ioExecutor)) } val jobRecoveryTimeoutStr = configuration.getValue(HighAvailabilityOptions.HA_JOB_DELAY) @@ -2527,14 +2539,17 @@ object JobManager { * * @param configuration The configuration for the JobManager * @param actorSystem The actor system running the JobManager + * @param futureExecutor to run JobManager's futures + * @param ioExecutor to run blocking io operations * @param jobManagerClass The class of the JobManager to be started * @param archiveClass The class of the MemoryArchivist to be started - * @return A tuple of references (JobManager Ref, Archiver Ref) + * @return A tuple of references (JobManager Ref, Archiver Ref) */ def startJobManagerActors( configuration: Configuration, actorSystem: ActorSystem, - executor: Executor, + futureExecutor: Executor, + ioExecutor: Executor, jobManagerClass: Class[_ <: JobManager], archiveClass: Class[_ <: MemoryArchivist]) : (ActorRef, ActorRef) = { @@ -2542,7 +2557,8 @@ object JobManager { startJobManagerActors( configuration, actorSystem, - executor, + futureExecutor, + ioExecutor, Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME), jobManagerClass, @@ -2555,7 +2571,8 @@ object JobManager { * * @param configuration The configuration for the JobManager * @param actorSystem The actor system running the JobManager - * @param executor to run JobManager's futures + * @param futureExecutor to run JobManager's futures + * @param ioExecutor to run blocking io operations * @param jobManagerActorName Optionally the name of the JobManager actor. If none is given, * the actor will have the name generated by the actor system. * @param archiveActorName Optionally the name of the archive actor. If none is given, @@ -2567,7 +2584,8 @@ object JobManager { def startJobManagerActors( configuration: Configuration, actorSystem: ActorSystem, - executor: Executor, + futureExecutor: Executor, + ioExecutor: Executor, jobManagerActorName: Option[String], archiveActorName: Option[String], jobManagerClass: Class[_ <: JobManager], @@ -2586,7 +2604,8 @@ object JobManager { jobRecoveryTimeout, metricsRegistry) = createJobManagerComponents( configuration, - executor, + futureExecutor, + ioExecutor, None) val archiveProps = getArchiveProps(archiveClass, archiveCount) @@ -2600,7 +2619,7 @@ object JobManager { val jobManagerProps = getJobManagerProps( jobManagerClass, configuration, - executor, + futureExecutor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index d9a208d..4367442 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -105,7 +105,11 @@ abstract class FlinkMiniCluster( private var isRunning = false - val executor = Executors.newFixedThreadPool( + val futureExecutor = Executors.newFixedThreadPool( + Hardware.getNumberCPUCores(), + new NamedThreadFactory("mini-cluster-future-", "-thread")) + + val ioExecutor = Executors.newFixedThreadPool( Hardware.getNumberCPUCores(), new NamedThreadFactory("mini-cluster-future-", "-thread")) @@ -405,7 +409,8 @@ abstract class FlinkMiniCluster( jobManagerLeaderRetrievalService.foreach(_.stop()) isRunning = false - executor.shutdownNow + futureExecutor.shutdownNow() + ioExecutor.shutdownNow() } protected def shutdown(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 59dd399..b2aedf7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -126,7 +126,8 @@ class LocalFlinkMiniCluster( jobRecoveryTimeout, metricsRegistry) = JobManager.createJobManagerComponents( config, - executor, + futureExecutor, + ioExecutor, createLeaderElectionService()) val archive = system.actorOf( @@ -139,7 +140,7 @@ class LocalFlinkMiniCluster( getJobManagerProps( jobManagerClass, config, - executor, + futureExecutor, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 2e44ecd..f46f7d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; @@ -68,7 +69,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint public RetrievableStateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception { return new HeapRetrievableStateHandle<CompletedCheckpoint>(state); } - }); + }, Executors.directExecutor()); } // --------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index aeb1ae1..f941c24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -400,6 +400,7 @@ public class JobManagerTest { config, system, system.dispatcher(), + system.dispatcher(), TestingJobManager.class, MemoryArchivist.class)._1(), leaderSessionId); @@ -607,6 +608,7 @@ public class JobManagerTest { config, actorSystem, actorSystem.dispatcher(), + actorSystem.dispatcher(), Option.apply("jm"), Option.apply("arch"), TestingJobManager.class, @@ -734,6 +736,7 @@ public class JobManagerTest { config, actorSystem, actorSystem.dispatcher(), + actorSystem.dispatcher(), Option.apply("jm"), Option.apply("arch"), TestingJobManager.class, @@ -831,6 +834,7 @@ public class JobManagerTest { new Configuration(), actorSystem, actorSystem.dispatcher(), + actorSystem.dispatcher(), Option.apply("jm"), Option.apply("arch"), TestingJobManager.class, http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index 4f26e68..1b8f0c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -83,6 +83,7 @@ public class JobSubmitTest { jmConfig, jobManagerSystem, jobManagerSystem.dispatcher(), + jobManagerSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java index 7d21cfd..d156f02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager; import akka.actor.ActorRef; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener; @@ -89,7 +90,8 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore( ZooKeeper.createClient(), "/testPutAndRemoveJobGraph", - localStateStorage); + localStateStorage, + Executors.directExecutor()); try { SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class); @@ -141,7 +143,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { @Test public void testRecoverJobGraphs() throws Exception { ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore( - ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage); + ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage, Executors.directExecutor()); try { SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class); @@ -191,10 +193,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { try { jobGraphs = new ZooKeeperSubmittedJobGraphStore( - ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage); + ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor()); otherJobGraphs = new ZooKeeperSubmittedJobGraphStore( - ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage); + ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor()); SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0); @@ -250,10 +252,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { @Test(expected = IllegalStateException.class) public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception { ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore( - ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage); + ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor()); ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore( - ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage); + ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor()); jobGraphs.start(null); otherJobGraphs.start(null); http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java index 3c8ea75..ddcb4e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java @@ -72,7 +72,12 @@ public class ClusterShutdownITCase extends TestLogger { // start job manager which doesn't shutdown the actor system ActorGateway jobManager = - TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager1"); + TestingUtils.createJobManager( + system, + system.dispatcher(), + system.dispatcher(), + config, + "jobmanager1"); // Tell the JobManager to inform us of shutdown actions jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); @@ -114,7 +119,12 @@ public class ClusterShutdownITCase extends TestLogger { // start job manager which doesn't shutdown the actor system ActorGateway jobManager = - TestingUtils.createJobManager(system, system.dispatcher(), config, "jobmanager2"); + TestingUtils.createJobManager( + system, + system.dispatcher(), + system.dispatcher(), + config, + "jobmanager2"); // Tell the JobManager to inform us of shutdown actions jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me); http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java index 5a98c8d..e7828fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java @@ -75,7 +75,12 @@ public class ResourceManagerITCase extends TestLogger { protected void run() { ActorGateway jobManager = - TestingUtils.createJobManager(system, system.dispatcher(), config, "ReconciliationTest"); + TestingUtils.createJobManager( + system, + system.dispatcher(), + system.dispatcher(), + config, + "ReconciliationTest"); ActorGateway me = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); @@ -129,7 +134,12 @@ public class ResourceManagerITCase extends TestLogger { protected void run() { ActorGateway jobManager = - TestingUtils.createJobManager(system, system.dispatcher(), config, "RegTest"); + TestingUtils.createJobManager( + system, + system.dispatcher(), + system.dispatcher(), + config, + "RegTest"); ActorGateway me = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty()); http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 83eaddb..83983d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -86,6 +86,7 @@ public class TaskManagerComponentsStartupShutdownTest { config, actorSystem, actorSystem.dispatcher(), + actorSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java index 63c1b29..dead732 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java @@ -103,6 +103,7 @@ public abstract class TaskManagerProcessReapingTestBase { new Configuration(), jmActorSystem, jmActorSystem.dispatcher(), + jmActorSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1; http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index b21eba0..5753349 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -106,7 +106,11 @@ public class TaskManagerRegistrationTest extends TestLogger { try { // a simple JobManager - jobManager = createJobManager(actorSystem, actorSystem.dispatcher(), config); + jobManager = createJobManager( + actorSystem, + actorSystem.dispatcher(), + actorSystem.dispatcher(), + config); startResourceManager(config, jobManager.actor()); // start two TaskManagers. it will automatically try to register @@ -189,6 +193,7 @@ public class TaskManagerRegistrationTest extends TestLogger { jobManager = createJobManager( actorSystem, actorSystem.dispatcher(), + actorSystem.dispatcher(), new Configuration()); startResourceManager(config, jobManager.actor()); @@ -631,6 +636,7 @@ public class TaskManagerRegistrationTest extends TestLogger { configuration, actorSystem, actorSystem.dispatcher(), + actorSystem.dispatcher(), NONE_STRING, NONE_STRING, JobManager.class, http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java index 8f9c932..5db3557 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java @@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; @@ -84,7 +85,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { public void testAdd() throws Exception { LongStateStorage longStateStorage = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>( - ZooKeeper.getClient(), longStateStorage); + ZooKeeper.getClient(), longStateStorage, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testAdd"; @@ -119,7 +120,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { public void testAddWithCreateMode() throws Exception { LongStateStorage longStateStorage = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>( - ZooKeeper.getClient(), longStateStorage); + ZooKeeper.getClient(), longStateStorage, Executors.directExecutor()); // Config Long state = 3457347234L; @@ -181,7 +182,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider); + ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath"); @@ -200,7 +201,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { when(client.create()).thenThrow(new RuntimeException("Expected test Exception.")); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - client, stateHandleProvider); + client, stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure"; @@ -230,7 +231,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider); + ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testReplace"; @@ -269,7 +270,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { RetrievableStateStorageHelper<Long> stateStorage = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateStorage); + ZooKeeper.getClient(), stateStorage, Executors.directExecutor()); store.replace("/testReplaceNonExistingPath", 0, 1L); } @@ -286,7 +287,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { when(client.setData()).thenThrow(new RuntimeException("Expected test Exception.")); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - client, stateHandleProvider); + client, stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure"; @@ -328,7 +329,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider); + ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testGetAndExists"; @@ -353,7 +354,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider); + ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); store.get("/testGetNonExistingPath"); } @@ -367,7 +368,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider); + ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testGetAll"; @@ -398,7 +399,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider); + ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testGetAllSortedByName"; @@ -428,7 +429,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider); + ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testRemove"; @@ -452,7 +453,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider); + ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testRemoveWithCallback"; @@ -491,7 +492,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider); + ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testDiscard"; @@ -513,7 +514,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider); + ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testDiscardAll"; http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index 4485b65..cf00206 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -173,6 +173,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { new Configuration(), _system, _system.dispatcher, + _system.dispatcher, None, None, classOf[JobManager], http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index b57a9dc..75891ed 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -303,18 +303,21 @@ object TestingUtils { /** Creates a testing JobManager using the default recovery mode (standalone) * * @param actorSystem The ActorSystem to use - * @param executor to run the JobManager's futures + * @param futureExecutor to run the JobManager's futures + * @param ioExecutor to run blocking io operations * @param configuration The Flink configuration * @return */ def createJobManager( actorSystem: ActorSystem, - executor: Executor, + futureExecutor: Executor, + ioExecutor: Executor, configuration: Configuration) : ActorGateway = { createJobManager( actorSystem, - executor, + futureExecutor, + ioExecutor, configuration, classOf[TestingJobManager], "" @@ -325,20 +328,23 @@ object TestingUtils { * Additional prefix can be supplied for the Actor system names * * @param actorSystem The ActorSystem to use - * @param executor to run the JobManager's futures + * @param futureExecutor to run the JobManager's futures + * @param ioExecutor to run blocking io operations * @param configuration The Flink configuration * @param prefix The prefix for the actor names * @return */ def createJobManager( actorSystem: ActorSystem, - executor: Executor, + futureExecutor: Executor, + ioExecutor: Executor, configuration: Configuration, prefix: String) : ActorGateway = { createJobManager( actorSystem, - executor, + futureExecutor, + ioExecutor, configuration, classOf[TestingJobManager], prefix @@ -349,19 +355,21 @@ object TestingUtils { * Creates a JobManager of the given class using the default recovery mode (standalone) * * @param actorSystem ActorSystem to use - * @param executor to run the JobManager's futures + * @param futureExecutor to run the JobManager's futures + * @param ioExecutor to run blocking io operations * @param configuration Configuration to use * @param jobManagerClass JobManager class to instantiate * @return */ def createJobManager( actorSystem: ActorSystem, - executor: Executor, + futureExecutor: Executor, + ioExecutor: Executor, configuration: Configuration, jobManagerClass: Class[_ <: JobManager]) : ActorGateway = { - createJobManager(actorSystem, executor, configuration, jobManagerClass, "") + createJobManager(actorSystem, futureExecutor, ioExecutor, configuration, jobManagerClass, "") } /** @@ -369,7 +377,8 @@ object TestingUtils { * Additional prefix for the Actor names can be added. * * @param actorSystem ActorSystem to use - * @param executor to run the JobManager's futures + * @param futureExecutor to run the JobManager's futures + * @param ioExecutor to run blocking io operations * @param configuration Configuration to use * @param jobManagerClass JobManager class to instantiate * @param prefix The prefix to use for the Actor names @@ -377,7 +386,8 @@ object TestingUtils { */ def createJobManager( actorSystem: ActorSystem, - executor: Executor, + futureExecutor: Executor, + ioExecutor: Executor, configuration: Configuration, jobManagerClass: Class[_ <: JobManager], prefix: String) @@ -390,7 +400,8 @@ object TestingUtils { val (actor, _) = JobManager.startJobManagerActors( configuration, actorSystem, - executor, + futureExecutor, + ioExecutor, Some(prefix + JobManager.JOB_MANAGER_NAME), Some(prefix + JobManager.ARCHIVE_NAME), jobManagerClass, http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index af86983..0ff2e78 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -130,6 +130,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test jmConfig, jmActorSystem, jmActorSystem.dispatcher(), + jmActorSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index f72ef34..8243e97 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -105,6 +105,7 @@ public class ProcessFailureCancelingITCase { jmConfig, jmActorSystem, jmActorSystem.dispatcher(), + jmActorSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 002e162..da5959b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -215,10 +215,14 @@ public class YarnApplicationMasterRunner { int numberProcessors = Hardware.getNumberCPUCores(); - final ExecutorService executor = Executors.newFixedThreadPool( + final ExecutorService futureExecutor = Executors.newFixedThreadPool( numberProcessors, new NamedThreadFactory("yarn-jobmanager-future-", "-thread-")); + final ExecutorService ioExecutor = Executors.newFixedThreadPool( + numberProcessors, + new NamedThreadFactory("yarn-jobmanager-io-", "-thread-")); + try { // ------- (1) load and parse / validate all configurations ------- @@ -333,7 +337,8 @@ public class YarnApplicationMasterRunner { ActorRef jobManager = JobManager.startJobManagerActors( config, actorSystem, - executor, + futureExecutor, + ioExecutor, new scala.Some<>(JobManager.JOB_MANAGER_NAME()), scala.Option.<String>empty(), getJobManagerClass(), @@ -427,7 +432,8 @@ public class YarnApplicationMasterRunner { } } - executor.shutdownNow(); + futureExecutor.shutdownNow(); + ioExecutor.shutdownNow(); return 0; }
