Repository: flink
Updated Branches:
  refs/heads/master 698e53e47 -> c590912c9


[FLINK-5073] Use Executor to run ZooKeeper callbacks in 
ZooKeeperStateHandleStore

Use dedicated Executor to run ZooKeeper callbacks in ZooKeeperStateHandleStore 
instead
of running it in the ZooKeeper client's thread. The callback can be blocking 
because it
discards state which might entail deleting files from disk.

Introduce dedicated Executor for blocking io operations

This closes #2815.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fb92d86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fb92d86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fb92d86

Branch: refs/heads/master
Commit: 3fb92d8687f03c1fac8b87396b2b5a7ff29f6dd6
Parents: ae4b274
Author: Till Rohrmann <[email protected]>
Authored: Tue Nov 15 22:45:04 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Nov 22 23:00:16 2016 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           | 12 +++-
 .../BackPressureStatsTrackerITCase.java         |  6 +-
 .../StackTraceSampleCoordinatorITCase.java      |  6 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  1 +
 .../ZooKeeperCheckpointRecoveryFactory.java     | 12 +++-
 .../ZooKeeperCompletedCheckpointStore.java      |  7 ++-
 .../ZooKeeperSubmittedJobGraphStore.java        |  7 ++-
 .../flink/runtime/util/ZooKeeperUtils.java      | 20 ++++---
 .../RetrievableStateStorageHelper.java          |  2 +-
 .../zookeeper/ZooKeeperStateHandleStore.java    | 13 ++++-
 .../flink/runtime/jobmanager/JobManager.scala   | 59 +++++++++++++-------
 .../runtime/minicluster/FlinkMiniCluster.scala  |  9 ++-
 .../minicluster/LocalFlinkMiniCluster.scala     |  5 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  3 +-
 .../runtime/jobmanager/JobManagerTest.java      |  4 ++
 .../flink/runtime/jobmanager/JobSubmitTest.java |  1 +
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java | 14 +++--
 .../resourcemanager/ClusterShutdownITCase.java  | 14 ++++-
 .../resourcemanager/ResourceManagerITCase.java  | 14 ++++-
 ...askManagerComponentsStartupShutdownTest.java |  1 +
 .../TaskManagerProcessReapingTestBase.java      |  1 +
 .../TaskManagerRegistrationTest.java            |  8 ++-
 .../ZooKeeperStateHandleStoreITCase.java        | 31 +++++-----
 .../jobmanager/JobManagerRegistrationTest.scala |  1 +
 .../runtime/testingUtils/TestingUtils.scala     | 35 ++++++++----
 ...ctTaskManagerProcessFailureRecoveryTest.java |  1 +
 .../recovery/ProcessFailureCancelingITCase.java |  1 +
 .../flink/yarn/YarnApplicationMasterRunner.java | 12 +++-
 28 files changed, 212 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 09f87bd..166218f 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -174,10 +174,14 @@ public class MesosApplicationMasterRunner {
 
                int numberProcessors = Hardware.getNumberCPUCores();
 
-               final ExecutorService executor = Executors.newFixedThreadPool(
+               final ExecutorService futureExecutor = 
Executors.newFixedThreadPool(
                        numberProcessors,
                        new NamedThreadFactory("mesos-jobmanager-future-", 
"-thread-"));
 
+               final ExecutorService ioExecutor = Executors.newFixedThreadPool(
+                       numberProcessors,
+                       new NamedThreadFactory("mesos-jobmanager-io-", 
"-thread-"));
+
                try {
                        // ------- (1) load and parse / validate all 
configurations -------
 
@@ -293,7 +297,8 @@ public class MesosApplicationMasterRunner {
                        ActorRef jobManager = JobManager.startJobManagerActors(
                                config,
                                actorSystem,
-                               executor,
+                               futureExecutor,
+                               ioExecutor,
                                new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
                                scala.Option.<String>empty(),
                                getJobManagerClass(),
@@ -399,7 +404,8 @@ public class MesosApplicationMasterRunner {
                        LOG.error("Failed to stop the artifact server", t);
                }
 
-               executor.shutdownNow();
+               futureExecutor.shutdownNow();
+               ioExecutor.shutdownNow();
 
                return 0;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index d01e0cf..357301a 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -120,7 +120,11 @@ public class BackPressureStatsTrackerITCase extends 
TestLogger {
                        }
 
                        try {
-                               jobManger = 
TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), 
new Configuration());
+                               jobManger = TestingUtils.createJobManager(
+                                       testActorSystem,
+                                       testActorSystem.dispatcher(),
+                                       testActorSystem.dispatcher(),
+                                       new Configuration());
 
                                final Configuration config = new 
Configuration();
                                
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 4b5bd2f..f31f41f 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -90,7 +90,11 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
                        ActorGateway taskManager = null;
 
                        try {
-                               jobManger = 
TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), 
new Configuration());
+                               jobManger = TestingUtils.createJobManager(
+                                       testActorSystem,
+                                       testActorSystem.dispatcher(),
+                                       testActorSystem.dispatcher(),
+                                       new Configuration());
 
                                final Configuration config = new 
Configuration();
                                
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index fcdf94d..853ef14 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -180,6 +180,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                                        jmConfig,
                                        jobManagerSystem[i],
                                        jobManagerSystem[i].dispatcher(),
+                                       jobManagerSystem[i].dispatcher(),
                                        JobManager.class,
                                        MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index f47012d..09bfa8c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -24,6 +24,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 
+import java.util.concurrent.Executor;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -35,9 +37,15 @@ public class ZooKeeperCheckpointRecoveryFactory implements 
CheckpointRecoveryFac
 
        private final Configuration config;
 
-       public ZooKeeperCheckpointRecoveryFactory(CuratorFramework client, 
Configuration config) {
+       private final Executor executor;
+
+       public ZooKeeperCheckpointRecoveryFactory(
+                       CuratorFramework client,
+                       Configuration config,
+                       Executor executor) {
                this.client = checkNotNull(client, "Curator client");
                this.config = checkNotNull(config, "Configuration");
+               this.executor = checkNotNull(executor, "Executor");
        }
 
        @Override
@@ -55,7 +63,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements 
CheckpointRecoveryFac
                        throws Exception {
 
                return ZooKeeperUtils.createCompletedCheckpoints(client, 
config, jobId,
-                               NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
+                               NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, 
executor);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 4f67921..4add504 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -92,13 +93,15 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
         *                                       start with a '/')
         * @param stateStorage                   State storage to be used to 
persist the completed
         *                                       checkpoint
+        * @param executor to give to the ZooKeeperStateHandleStore to run 
ZooKeeper callbacks
         * @throws Exception
         */
        public ZooKeeperCompletedCheckpointStore(
                        int maxNumberOfCheckpointsToRetain,
                        CuratorFramework client,
                        String checkpointsPath,
-                       RetrievableStateStorageHelper<CompletedCheckpoint> 
stateStorage) throws Exception {
+                       RetrievableStateStorageHelper<CompletedCheckpoint> 
stateStorage,
+                       Executor executor) throws Exception {
 
                checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain 
at least one checkpoint.");
                checkNotNull(stateStorage, "State storage");
@@ -115,7 +118,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                // All operations will have the path as root
                this.client = client.usingNamespace(client.getNamespace() + 
checkpointsPath);
 
-               this.checkpointsInZooKeeper = new 
ZooKeeperStateHandleStore<>(this.client, stateStorage);
+               this.checkpointsInZooKeeper = new 
ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
 
                this.checkpointStateHandles = new 
ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index ec05f1e..b241712 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -39,6 +39,7 @@ import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -93,12 +94,14 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
         * @param client ZooKeeper client
         * @param currentJobsPath ZooKeeper path for current job graphs
         * @param stateStorage State storage used to persist the submitted jobs
+        * @param executor to give to the ZooKeeperStateHandleStore to run 
ZooKeeper callbacks
         * @throws Exception
         */
        public ZooKeeperSubmittedJobGraphStore(
                        CuratorFramework client,
                        String currentJobsPath,
-                       RetrievableStateStorageHelper<SubmittedJobGraph> 
stateStorage) throws Exception {
+                       RetrievableStateStorageHelper<SubmittedJobGraph> 
stateStorage,
+                       Executor executor) throws Exception {
 
                checkNotNull(currentJobsPath, "Current jobs path");
                checkNotNull(stateStorage, "State storage");
@@ -114,7 +117,7 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                // All operations will have the path as root
                CuratorFramework facade = 
client.usingNamespace(client.getNamespace() + currentJobsPath);
 
-               this.jobGraphsInZooKeeper = new 
ZooKeeperStateHandleStore<>(facade, stateStorage);
+               this.jobGraphsInZooKeeper = new 
ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
 
                this.pathCache = new PathChildrenCache(facade, "/", false);
                pathCache.getListenable().addListener(new 
SubmittedJobGraphsPathCacheListener());

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index a9887a6..70ac6c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -216,12 +217,14 @@ public class ZooKeeperUtils {
         *
         * @param client        The {@link CuratorFramework} ZooKeeper client 
to use
         * @param configuration {@link Configuration} object
+        * @param executor to run ZooKeeper callbacks
         * @return {@link ZooKeeperSubmittedJobGraphStore} instance
         * @throws Exception if the submitted job graph store cannot be created
         */
        public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
                        CuratorFramework client,
-                       Configuration configuration) throws Exception {
+                       Configuration configuration,
+                       Executor executor) throws Exception {
 
                checkNotNull(configuration, "Configuration");
 
@@ -235,7 +238,7 @@ public class ZooKeeperUtils {
                                ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH);
 
                return new ZooKeeperSubmittedJobGraphStore(
-                               client, zooKeeperSubmittedJobsPath, 
stateStorage);
+                               client, zooKeeperSubmittedJobsPath, 
stateStorage, executor);
        }
 
        /**
@@ -245,6 +248,7 @@ public class ZooKeeperUtils {
         * @param configuration                  {@link Configuration} object
         * @param jobId                          ID of job to create the 
instance for
         * @param maxNumberOfCheckpointsToRetain The maximum number of 
checkpoints to retain
+        * @param executor to run ZooKeeper callbacks
         * @return {@link ZooKeeperCompletedCheckpointStore} instance
         * @throws Exception if the completed checkpoint store cannot be created
         */
@@ -252,7 +256,8 @@ public class ZooKeeperUtils {
                        CuratorFramework client,
                        Configuration configuration,
                        JobID jobId,
-                       int maxNumberOfCheckpointsToRetain) throws Exception {
+                       int maxNumberOfCheckpointsToRetain,
+                       Executor executor) throws Exception {
 
                checkNotNull(configuration, "Configuration");
 
@@ -269,10 +274,11 @@ public class ZooKeeperUtils {
                checkpointsPath += 
ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
 
                return new ZooKeeperCompletedCheckpointStore(
-                               maxNumberOfCheckpointsToRetain,
-                               client,
-                               checkpointsPath,
-                               stateStorage);
+                       maxNumberOfCheckpointsToRetain,
+                       client,
+                       checkpointsPath,
+                       stateStorage,
+                       executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
index 1434f74..f6acea3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
 import java.io.Serializable;
 
 /**
- * State storage helper which is used by {@link ZooKeeperStateHandleStore} to 
persiste state before
+ * State storage helper which is used by {@link ZooKeeperStateHandleStore} to 
persist state before
  * the state handle is written to ZooKeeper.
  *
  * @param <T> The type of the data that can be stored by this storage helper.

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 5623715..14d9f6f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -30,10 +30,10 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -75,6 +75,8 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
 
        private final RetrievableStateStorageHelper<T> storage;
 
+       private final Executor executor;
+
        /**
         * Creates a {@link ZooKeeperStateHandleStore}.
         *
@@ -82,13 +84,18 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
         *                            expected that the client's namespace 
ensures that the root
         *                            path is exclusive for all state handles 
managed by this
         *                            instance, e.g. 
<code>client.usingNamespace("/stateHandles")</code>
+        * @param storage to persist the actual state and whose returned state 
handle is then written
+        *                to ZooKeeper
+        * @param executor to run the ZooKeeper callbacks
         */
        public ZooKeeperStateHandleStore(
                CuratorFramework client,
-               RetrievableStateStorageHelper<T> storage) throws IOException {
+               RetrievableStateStorageHelper<T> storage,
+               Executor executor) {
 
                this.client = checkNotNull(client, "Curator client");
                this.storage = checkNotNull(storage, "State storage");
+               this.executor = checkNotNull(executor);
        }
 
        /**
@@ -348,7 +355,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
                checkNotNull(callback, "Background callback");
 
-               
client.delete().deletingChildrenIfNeeded().inBackground(callback).forPath(pathInZooKeeper);
+               
client.delete().deletingChildrenIfNeeded().inBackground(callback, 
executor).forPath(pathInZooKeeper);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index df80d72..08ed0a4 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1970,24 +1970,30 @@ object JobManager {
 
     val numberProcessors = Hardware.getNumberCPUCores()
 
-    val executor = Executors.newFixedThreadPool(
+    val futureExecutor = Executors.newFixedThreadPool(
       numberProcessors,
       new NamedThreadFactory("jobmanager-future-", "-thread-"))
 
+    val ioExecutor = Executors.newFixedThreadPool(
+      numberProcessors,
+      new NamedThreadFactory("jobmanager-io-", "-thread-")
+    )
+
     val (jobManagerSystem, _, _, webMonitorOption, _) = try {
       startActorSystemAndJobManagerActors(
         configuration,
         executionMode,
         listeningAddress,
         listeningPort,
-        executor,
+        futureExecutor,
+        ioExecutor,
         classOf[JobManager],
         classOf[MemoryArchivist],
         Option(classOf[StandaloneResourceManager])
       )
     } catch {
       case t: Throwable =>
-          executor.shutdownNow()
+          futureExecutor.shutdownNow()
 
         throw t
     }
@@ -2005,7 +2011,8 @@ object JobManager {
         }
     }
 
-    executor.shutdownNow()
+    futureExecutor.shutdownNow()
+    ioExecutor.shutdownNow()
   }
 
   /**
@@ -2113,7 +2120,8 @@ object JobManager {
     *                      additional TaskManager in the same process.
     * @param listeningAddress The hostname where the JobManager should listen 
for messages.
     * @param listeningPort The port where the JobManager should listen for 
messages
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param jobManagerClass The class of the JobManager to be started
     * @param archiveClass The class of the Archivist to be started
     * @param resourceManagerClass Optional class of resource manager if one 
should be started
@@ -2125,7 +2133,8 @@ object JobManager {
       executionMode: JobManagerMode,
       listeningAddress: String,
       listeningPort: Int,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist],
       resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
@@ -2194,7 +2203,8 @@ object JobManager {
       val (jobManager, archive) = startJobManagerActors(
         configuration,
         jobManagerSystem,
-        executor,
+        futureExecutor,
+        ioExecutor,
         jobManagerClass,
         archiveClass)
 
@@ -2395,14 +2405,16 @@ object JobManager {
    *              delayBetweenRetries, timeout)
    *
    * @param configuration The configuration from which to parse the config 
values.
-   * @param executor to run JobManager's futures
+   * @param futureExecutor to run JobManager's futures
+   * @param ioExecutor to run blocking io operations
    * @param leaderElectionServiceOption LeaderElectionService which shall be 
returned if the option
    *                                    is defined
    * @return The members for a default JobManager.
    */
   def createJobManagerComponents(
       configuration: Configuration,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       leaderElectionServiceOption: Option[LeaderElectionService]) :
     (InstanceManager,
     FlinkScheduler,
@@ -2433,11 +2445,11 @@ object JobManager {
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
     var libraryCacheManager: BlobLibraryCacheManager = null
-    
+
     try {
       blobServer = new BlobServer(configuration)
       instanceManager = new InstanceManager()
-      scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executor))
+      scheduler = new 
FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, 
cleanupInterval)
 
       instanceManager.addInstanceListener(scheduler)
@@ -2482,8 +2494,8 @@ object JobManager {
           }
 
           (leaderElectionService,
-            ZooKeeperUtils.createSubmittedJobGraphs(client, configuration),
-            new ZooKeeperCheckpointRecoveryFactory(client, configuration))
+            ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, 
ioExecutor),
+            new ZooKeeperCheckpointRecoveryFactory(client, configuration, 
ioExecutor))
       }
 
     val jobRecoveryTimeoutStr = 
configuration.getValue(HighAvailabilityOptions.HA_JOB_DELAY)
@@ -2527,14 +2539,17 @@ object JobManager {
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
+   * @param futureExecutor to run JobManager's futures
+   * @param ioExecutor to run blocking io operations
    * @param jobManagerClass The class of the JobManager to be started
    * @param archiveClass The class of the MemoryArchivist to be started
-    * @return A tuple of references (JobManager Ref, Archiver Ref)
+   * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
@@ -2542,7 +2557,8 @@ object JobManager {
     startJobManagerActors(
       configuration,
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       Some(JOB_MANAGER_NAME),
       Some(ARCHIVE_NAME),
       jobManagerClass,
@@ -2555,7 +2571,8 @@ object JobManager {
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
-   * @param executor to run JobManager's futures
+   * @param futureExecutor to run JobManager's futures
+   * @param ioExecutor to run blocking io operations
    * @param jobManagerActorName Optionally the name of the JobManager actor. 
If none is given,
    *                          the actor will have the name generated by the 
actor system.
    * @param archiveActorName Optionally the name of the archive actor. If none 
is given,
@@ -2567,7 +2584,8 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       jobManagerActorName: Option[String],
       archiveActorName: Option[String],
       jobManagerClass: Class[_ <: JobManager],
@@ -2586,7 +2604,8 @@ object JobManager {
     jobRecoveryTimeout,
     metricsRegistry) = createJobManagerComponents(
       configuration,
-      executor,
+      futureExecutor,
+      ioExecutor,
       None)
 
     val archiveProps = getArchiveProps(archiveClass, archiveCount)
@@ -2600,7 +2619,7 @@ object JobManager {
     val jobManagerProps = getJobManagerProps(
       jobManagerClass,
       configuration,
-      executor,
+      futureExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index d9a208d..4367442 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -105,7 +105,11 @@ abstract class FlinkMiniCluster(
 
   private var isRunning = false
 
-  val executor = Executors.newFixedThreadPool(
+  val futureExecutor = Executors.newFixedThreadPool(
+    Hardware.getNumberCPUCores(),
+    new NamedThreadFactory("mini-cluster-future-", "-thread"))
+
+  val ioExecutor = Executors.newFixedThreadPool(
     Hardware.getNumberCPUCores(),
     new NamedThreadFactory("mini-cluster-future-", "-thread"))
 
@@ -405,7 +409,8 @@ abstract class FlinkMiniCluster(
     jobManagerLeaderRetrievalService.foreach(_.stop())
     isRunning = false
 
-    executor.shutdownNow
+    futureExecutor.shutdownNow()
+    ioExecutor.shutdownNow()
   }
 
   protected def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 59dd399..b2aedf7 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -126,7 +126,8 @@ class LocalFlinkMiniCluster(
     jobRecoveryTimeout,
     metricsRegistry) = JobManager.createJobManagerComponents(
       config,
-      executor,
+      futureExecutor,
+      ioExecutor,
       createLeaderElectionService())
 
     val archive = system.actorOf(
@@ -139,7 +140,7 @@ class LocalFlinkMiniCluster(
       getJobManagerProps(
         jobManagerClass,
         config,
-        executor,
+        futureExecutor,
         instanceManager,
         scheduler,
         libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 2e44ecd..f46f7d2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
@@ -68,7 +69,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends 
CompletedCheckpoint
                        public RetrievableStateHandle<CompletedCheckpoint> 
store(CompletedCheckpoint state) throws Exception {
                                return new 
HeapRetrievableStateHandle<CompletedCheckpoint>(state);
                        }
-               });
+               }, Executors.directExecutor());
        }
 
        // 
---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index aeb1ae1..f941c24 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -400,6 +400,7 @@ public class JobManagerTest {
                                        config,
                                        system,
                                        system.dispatcher(),
+                                       system.dispatcher(),
                                        TestingJobManager.class,
                                        MemoryArchivist.class)._1(),
                                leaderSessionId);
@@ -607,6 +608,7 @@ public class JobManagerTest {
                                config,
                                actorSystem,
                                actorSystem.dispatcher(),
+                               actorSystem.dispatcher(),
                                Option.apply("jm"),
                                Option.apply("arch"),
                                TestingJobManager.class,
@@ -734,6 +736,7 @@ public class JobManagerTest {
                                config,
                                actorSystem,
                                actorSystem.dispatcher(),
+                               actorSystem.dispatcher(),
                                Option.apply("jm"),
                                Option.apply("arch"),
                                TestingJobManager.class,
@@ -831,6 +834,7 @@ public class JobManagerTest {
                                new Configuration(),
                                actorSystem,
                                actorSystem.dispatcher(),
+                               actorSystem.dispatcher(),
                                Option.apply("jm"),
                                Option.apply("arch"),
                                TestingJobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 4f26e68..1b8f0c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -83,6 +83,7 @@ public class JobSubmitTest {
                        jmConfig,
                        jobManagerSystem,
                        jobManagerSystem.dispatcher(),
+                       jobManagerSystem.dispatcher(),
                        JobManager.class,
                        MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 7d21cfd..d156f02 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager;
 import akka.actor.ActorRef;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
@@ -89,7 +90,8 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends 
TestLogger {
                ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
                        ZooKeeper.createClient(),
                        "/testPutAndRemoveJobGraph",
-                       localStateStorage);
+                       localStateStorage,
+                       Executors.directExecutor());
 
                try {
                        SubmittedJobGraphListener listener = 
mock(SubmittedJobGraphListener.class);
@@ -141,7 +143,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends 
TestLogger {
        @Test
        public void testRecoverJobGraphs() throws Exception {
                ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testRecoverJobGraphs", localStateStorage);
+                               ZooKeeper.createClient(), 
"/testRecoverJobGraphs", localStateStorage, Executors.directExecutor());
 
                try {
                        SubmittedJobGraphListener listener = 
mock(SubmittedJobGraphListener.class);
@@ -191,10 +193,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase 
extends TestLogger {
 
                try {
                        jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage);
+                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
 
                        otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage);
+                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
 
 
                        SubmittedJobGraph jobGraph = 
createSubmittedJobGraph(new JobID(), 0);
@@ -250,10 +252,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase 
extends TestLogger {
        @Test(expected = IllegalStateException.class)
        public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
                ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, 
Executors.directExecutor());
 
                ZooKeeperSubmittedJobGraphStore otherJobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, 
Executors.directExecutor());
 
                jobGraphs.start(null);
                otherJobGraphs.start(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
index 3c8ea75..ddcb4e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
@@ -72,7 +72,12 @@ public class ClusterShutdownITCase extends TestLogger {
 
                        // start job manager which doesn't shutdown the actor 
system
                        ActorGateway jobManager =
-                               TestingUtils.createJobManager(system, 
system.dispatcher(), config, "jobmanager1");
+                               TestingUtils.createJobManager(
+                                       system,
+                                       system.dispatcher(),
+                                       system.dispatcher(),
+                                       config,
+                                       "jobmanager1");
 
                        // Tell the JobManager to inform us of shutdown actions
                        
jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
@@ -114,7 +119,12 @@ public class ClusterShutdownITCase extends TestLogger {
 
                        // start job manager which doesn't shutdown the actor 
system
                        ActorGateway jobManager =
-                               TestingUtils.createJobManager(system, 
system.dispatcher(), config, "jobmanager2");
+                               TestingUtils.createJobManager(
+                                       system,
+                                       system.dispatcher(),
+                                       system.dispatcher(),
+                                       config,
+                                       "jobmanager2");
 
                        // Tell the JobManager to inform us of shutdown actions
                        
jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
index 5a98c8d..e7828fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
@@ -75,7 +75,12 @@ public class ResourceManagerITCase extends TestLogger {
                protected void run() {
 
                        ActorGateway jobManager =
-                               TestingUtils.createJobManager(system, 
system.dispatcher(), config, "ReconciliationTest");
+                               TestingUtils.createJobManager(
+                                       system,
+                                       system.dispatcher(),
+                                       system.dispatcher(),
+                                       config,
+                                       "ReconciliationTest");
                        ActorGateway me =
                                TestingUtils.createForwardingActor(system, 
getTestActor(), Option.<String>empty());
 
@@ -129,7 +134,12 @@ public class ResourceManagerITCase extends TestLogger {
                protected void run() {
 
                        ActorGateway jobManager =
-                               TestingUtils.createJobManager(system, 
system.dispatcher(), config, "RegTest");
+                               TestingUtils.createJobManager(
+                                       system,
+                                       system.dispatcher(),
+                                       system.dispatcher(),
+                                       config,
+                                       "RegTest");
                        ActorGateway me =
                                TestingUtils.createForwardingActor(system, 
getTestActor(), Option.<String>empty());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 83eaddb..83983d4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -86,6 +86,7 @@ public class TaskManagerComponentsStartupShutdownTest {
                                config,
                                actorSystem,
                                actorSystem.dispatcher(),
+                               actorSystem.dispatcher(),
                                JobManager.class,
                                MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 63c1b29..dead732 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -103,6 +103,7 @@ public abstract class TaskManagerProcessReapingTestBase {
                                new Configuration(),
                                jmActorSystem,
                                jmActorSystem.dispatcher(),
+                               jmActorSystem.dispatcher(),
                                JobManager.class,
                                MemoryArchivist.class)._1;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index b21eba0..5753349 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -106,7 +106,11 @@ public class TaskManagerRegistrationTest extends 
TestLogger {
 
                        try {
                                // a simple JobManager
-                               jobManager = createJobManager(actorSystem, 
actorSystem.dispatcher(), config);
+                               jobManager = createJobManager(
+                                       actorSystem,
+                                       actorSystem.dispatcher(),
+                                       actorSystem.dispatcher(),
+                                       config);
                                startResourceManager(config, 
jobManager.actor());
 
                                // start two TaskManagers. it will 
automatically try to register
@@ -189,6 +193,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
                                jobManager = createJobManager(
                                        actorSystem,
                                        actorSystem.dispatcher(),
+                                       actorSystem.dispatcher(),
                                        new Configuration());
 
                                startResourceManager(config, 
jobManager.actor());
@@ -631,6 +636,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
                        configuration,
                        actorSystem,
                        actorSystem.dispatcher(),
+                       actorSystem.dispatcher(),
                        NONE_STRING,
                        NONE_STRING,
                        JobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
index 8f9c932..5db3557 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
@@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -84,7 +85,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        public void testAdd() throws Exception {
                LongStateStorage longStateStorage = new LongStateStorage();
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<Long>(
-                               ZooKeeper.getClient(), longStateStorage);
+                               ZooKeeper.getClient(), longStateStorage, 
Executors.directExecutor());
 
                // Config
                final String pathInZooKeeper = "/testAdd";
@@ -119,7 +120,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        public void testAddWithCreateMode() throws Exception {
                LongStateStorage longStateStorage = new LongStateStorage();
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<Long>(
-                               ZooKeeper.getClient(), longStateStorage);
+                               ZooKeeper.getClient(), longStateStorage, 
Executors.directExecutor());
 
                // Config
                Long state = 3457347234L;
@@ -181,7 +182,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider);
+                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
 
                
ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath");
 
@@ -200,7 +201,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                when(client.create()).thenThrow(new RuntimeException("Expected 
test Exception."));
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               client, stateHandleProvider);
+                               client, stateHandleProvider, 
Executors.directExecutor());
 
                // Config
                final String pathInZooKeeper = 
"/testAddDiscardStateHandleAfterFailure";
@@ -230,7 +231,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider);
+                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
 
                // Config
                final String pathInZooKeeper = "/testReplace";
@@ -269,7 +270,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                RetrievableStateStorageHelper<Long> stateStorage = new 
LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateStorage);
+                               ZooKeeper.getClient(), stateStorage, 
Executors.directExecutor());
 
                store.replace("/testReplaceNonExistingPath", 0, 1L);
        }
@@ -286,7 +287,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                when(client.setData()).thenThrow(new RuntimeException("Expected 
test Exception."));
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               client, stateHandleProvider);
+                               client, stateHandleProvider, 
Executors.directExecutor());
 
                // Config
                final String pathInZooKeeper = 
"/testReplaceDiscardStateHandleAfterFailure";
@@ -328,7 +329,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider);
+                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
 
                // Config
                final String pathInZooKeeper = "/testGetAndExists";
@@ -353,7 +354,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider);
+                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
 
                store.get("/testGetNonExistingPath");
        }
@@ -367,7 +368,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider);
+                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
 
                // Config
                final String pathInZooKeeper = "/testGetAll";
@@ -398,7 +399,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider);
+                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
 
                // Config
                final String pathInZooKeeper = "/testGetAllSortedByName";
@@ -428,7 +429,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider);
+                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
 
                // Config
                final String pathInZooKeeper = "/testRemove";
@@ -452,7 +453,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider);
+                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
 
                // Config
                final String pathInZooKeeper = "/testRemoveWithCallback";
@@ -491,7 +492,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider);
+                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
 
                // Config
                final String pathInZooKeeper = "/testDiscard";
@@ -513,7 +514,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZooKeeper.getClient(), stateHandleProvider);
+                               ZooKeeper.getClient(), stateHandleProvider, 
Executors.directExecutor());
 
                // Config
                final String pathInZooKeeper = "/testDiscardAll";

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 4485b65..cf00206 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -173,6 +173,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
       new Configuration(),
       _system,
       _system.dispatcher,
+      _system.dispatcher,
       None,
       None,
       classOf[JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index b57a9dc..75891ed 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -303,18 +303,21 @@ object TestingUtils {
   /** Creates a testing JobManager using the default recovery mode (standalone)
     *
     * @param actorSystem The ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration The Flink configuration
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration)
     : ActorGateway = {
     createJobManager(
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       configuration,
       classOf[TestingJobManager],
       ""
@@ -325,20 +328,23 @@ object TestingUtils {
     * Additional prefix can be supplied for the Actor system names
     *
     * @param actorSystem The ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration The Flink configuration
     * @param prefix The prefix for the actor names
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration,
       prefix: String)
     : ActorGateway = {
     createJobManager(
       actorSystem,
-      executor,
+      futureExecutor,
+      ioExecutor,
       configuration,
       classOf[TestingJobManager],
       prefix
@@ -349,19 +355,21 @@ object TestingUtils {
     * Creates a JobManager of the given class using the default recovery mode 
(standalone)
     *
     * @param actorSystem ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager])
     : ActorGateway = {
 
-    createJobManager(actorSystem, executor, configuration, jobManagerClass, "")
+    createJobManager(actorSystem, futureExecutor, ioExecutor, configuration, 
jobManagerClass, "")
   }
 
   /**
@@ -369,7 +377,8 @@ object TestingUtils {
     * Additional prefix for the Actor names can be added.
     *
     * @param actorSystem ActorSystem to use
-    * @param executor to run the JobManager's futures
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @param prefix The prefix to use for the Actor names
@@ -377,7 +386,8 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      executor: Executor,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager],
       prefix: String)
@@ -390,7 +400,8 @@ object TestingUtils {
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
         actorSystem,
-        executor,
+        futureExecutor,
+        ioExecutor,
         Some(prefix + JobManager.JOB_MANAGER_NAME),
         Some(prefix + JobManager.ARCHIVE_NAME),
         jobManagerClass,

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index af86983..0ff2e78 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -130,6 +130,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                                jmConfig,
                                jmActorSystem,
                                jmActorSystem.dispatcher(),
+                               jmActorSystem.dispatcher(),
                                JobManager.class,
                                MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index f72ef34..8243e97 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -105,6 +105,7 @@ public class ProcessFailureCancelingITCase {
                                jmConfig,
                                jmActorSystem,
                                jmActorSystem.dispatcher(),
+                               jmActorSystem.dispatcher(),
                                JobManager.class,
                                MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fb92d86/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 002e162..da5959b 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -215,10 +215,14 @@ public class YarnApplicationMasterRunner {
 
                int numberProcessors = Hardware.getNumberCPUCores();
 
-               final ExecutorService executor = Executors.newFixedThreadPool(
+               final ExecutorService futureExecutor = 
Executors.newFixedThreadPool(
                        numberProcessors,
                        new NamedThreadFactory("yarn-jobmanager-future-", 
"-thread-"));
 
+               final ExecutorService ioExecutor = Executors.newFixedThreadPool(
+                       numberProcessors,
+                       new NamedThreadFactory("yarn-jobmanager-io-", 
"-thread-"));
+
                try {
                        // ------- (1) load and parse / validate all 
configurations -------
 
@@ -333,7 +337,8 @@ public class YarnApplicationMasterRunner {
                        ActorRef jobManager = JobManager.startJobManagerActors(
                                config,
                                actorSystem,
-                               executor,
+                               futureExecutor,
+                               ioExecutor,
                                new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
                                scala.Option.<String>empty(),
                                getJobManagerClass(),
@@ -427,7 +432,8 @@ public class YarnApplicationMasterRunner {
                        }
                }
 
-               executor.shutdownNow();
+               futureExecutor.shutdownNow();
+               ioExecutor.shutdownNow();
 
                return 0;
        }

Reply via email to