[FLINK-5085] Execute CheckpointCoordinator's state discard calls asynchronously

The CheckpointCoordinator is now given an Executor which is used to execute the 
state discard
calls asynchronously. This will prevent blocking operations to be executed from 
within the
calling thread.

Shut down ExecutorServices gracefully

This closes #2825.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c590912c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c590912c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c590912c

Branch: refs/heads/master
Commit: c590912c93a4059b40452dfa6cffbdd4d58cac13
Parents: 3fb92d8
Author: Till Rohrmann <[email protected]>
Authored: Thu Nov 17 15:39:11 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Nov 22 23:00:17 2016 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           |  69 +-
 .../clusterframework/MesosJobManager.scala      |  34 +-
 .../webmonitor/BackPressureStatsTracker.java    |   2 +-
 .../BackPressureStatsTrackerTest.java           |   2 +-
 .../checkpoint/CheckpointCoordinator.java       | 104 ++-
 .../runtime/checkpoint/PendingCheckpoint.java   |  31 +-
 .../flink/runtime/concurrent/Executors.java     |  49 ++
 .../runtime/executiongraph/ExecutionGraph.java  |  51 +-
 .../executiongraph/ExecutionGraphBuilder.java   |  28 +-
 .../runtime/executiongraph/ExecutionVertex.java |   4 +-
 .../restart/FailureRateRestartStrategy.java     |   2 +-
 .../restart/FixedDelayRestartStrategy.java      |   2 +-
 .../ContaineredJobManager.scala                 |   9 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  30 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  11 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |  38 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 679 ++++++++++---------
 .../checkpoint/CheckpointStateRestoreTest.java  |  83 +--
 ...ExecutionGraphCheckpointCoordinatorTest.java |   1 +
 .../checkpoint/PendingCheckpointTest.java       |  11 +-
 .../executiongraph/AllVerticesIteratorTest.java |   2 +-
 .../ArchivedExecutionGraphTest.java             |   1 +
 .../ExecutionGraphConstructionTest.java         |  24 +-
 .../ExecutionGraphDeploymentTest.java           |   7 +-
 .../ExecutionGraphMetricsTest.java              |   1 +
 .../ExecutionGraphRestartTest.java              |  17 +-
 .../ExecutionGraphSignalsTest.java              |   1 +
 .../executiongraph/ExecutionGraphTestUtils.java |   1 +
 .../ExecutionStateProgressTest.java             |   3 +-
 .../executiongraph/PointwisePatternTest.java    |  21 +-
 .../TerminalStateDeadlockTest.java              |   1 +
 .../executiongraph/VertexSlotSharingTest.java   |  15 +-
 .../restart/FixedDelayRestartStrategyTest.java  |   2 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  38 +-
 .../JobManagerLeaderElectionTest.java           |  29 +-
 .../TaskManagerLossFailsTasksTest.scala         |   1 +
 .../runtime/testingUtils/TestingCluster.scala   |   8 +-
 .../testingUtils/TestingJobManager.scala        |   6 +-
 .../partitioner/RescalePartitionerTest.java     |   1 +
 .../flink/yarn/TestingYarnJobManager.scala      |   9 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  17 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   9 +-
 42 files changed, 834 insertions(+), 620 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 166218f..3695578 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -65,6 +65,7 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.File;
 import java.net.InetAddress;
 import java.net.URL;
+import java.net.UnknownHostException;
 import java.security.PrivilegedAction;
 import java.util.Map;
 import java.util.UUID;
@@ -172,43 +173,51 @@ public class MesosApplicationMasterRunner {
                WebMonitor webMonitor = null;
                MesosArtifactServer artifactServer = null;
 
-               int numberProcessors = Hardware.getNumberCPUCores();
+               // ------- (1) load and parse / validate all configurations 
-------
 
-               final ExecutorService futureExecutor = 
Executors.newFixedThreadPool(
-                       numberProcessors,
-                       new NamedThreadFactory("mesos-jobmanager-future-", 
"-thread-"));
+               // loading all config values here has the advantage that the 
program fails fast, if any
+               // configuration problem occurs
 
-               final ExecutorService ioExecutor = Executors.newFixedThreadPool(
-                       numberProcessors,
-                       new NamedThreadFactory("mesos-jobmanager-io-", 
"-thread-"));
+               final String workingDir = 
ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
+               checkState(workingDir != null, "Sandbox directory variable (%s) 
not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+
+               final String sessionID = 
ENV.get(MesosConfigKeys.ENV_SESSION_ID);
+               checkState(sessionID != null, "Session ID (%s) not set", 
MesosConfigKeys.ENV_SESSION_ID);
+
+               // Note that we use the "appMasterHostname" given by the 
system, to make sure
+               // we use the hostnames consistently throughout akka.
+               // for akka "localhost" and "localhost.localdomain" are 
different actors.
+               final String appMasterHostname;
 
                try {
-                       // ------- (1) load and parse / validate all 
configurations -------
+                       appMasterHostname = 
InetAddress.getLocalHost().getHostName();
+               } catch (UnknownHostException uhe) {
+                       LOG.error("Could not retrieve the local hostname.", 
uhe);
 
-                       // loading all config values here has the advantage 
that the program fails fast, if any
-                       // configuration problem occurs
+                       return INIT_ERROR_EXIT_CODE;
+               }
 
-                       final String workingDir = 
ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
-                       checkState(workingDir != null, "Sandbox directory 
variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+               // Flink configuration
+               final Configuration dynamicProperties =
+                       
FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+               LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
 
-                       final String sessionID = 
ENV.get(MesosConfigKeys.ENV_SESSION_ID);
-                       checkState(sessionID != null, "Session ID (%s) not 
set", MesosConfigKeys.ENV_SESSION_ID);
+               final Configuration config = createConfiguration(workingDir, 
dynamicProperties);
 
-                       // Note that we use the "appMasterHostname" given by 
the system, to make sure
-                       // we use the hostnames consistently throughout akka.
-                       // for akka "localhost" and "localhost.localdomain" are 
different actors.
-                       final String appMasterHostname = 
InetAddress.getLocalHost().getHostName();
+               // Mesos configuration
+               final MesosConfiguration mesosConfig = 
createMesosConfig(config, appMasterHostname);
 
-                       // Flink configuration
-                       final Configuration dynamicProperties =
-                               
FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
-                       LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
+               int numberProcessors = Hardware.getNumberCPUCores();
 
-                       final Configuration config = 
createConfiguration(workingDir, dynamicProperties);
+               final ExecutorService futureExecutor = 
Executors.newFixedThreadPool(
+                       numberProcessors,
+                       new NamedThreadFactory("mesos-jobmanager-future-", 
"-thread-"));
 
-                       // Mesos configuration
-                       final MesosConfiguration mesosConfig = 
createMesosConfig(config, appMasterHostname);
+               final ExecutorService ioExecutor = Executors.newFixedThreadPool(
+                       numberProcessors,
+                       new NamedThreadFactory("mesos-jobmanager-io-", 
"-thread-"));
 
+               try {
                        // environment values related to TM
                        final int taskManagerContainerMemory;
                        final int numInitialTaskManagers;
@@ -380,6 +389,9 @@ public class MesosApplicationMasterRunner {
                                }
                        }
 
+                       futureExecutor.shutdownNow();
+                       ioExecutor.shutdownNow();
+
                        return INIT_ERROR_EXIT_CODE;
                }
 
@@ -404,8 +416,11 @@ public class MesosApplicationMasterRunner {
                        LOG.error("Failed to stop the artifact server", t);
                }
 
-               futureExecutor.shutdownNow();
-               ioExecutor.shutdownNow();
+               org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
+                       AkkaUtils.getTimeout(config).toMillis(),
+                       TimeUnit.MILLISECONDS,
+                       futureExecutor,
+                       ioExecutor);
 
                return 0;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
index 300539c..38886f8 100644
--- 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
+++ 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
@@ -37,8 +37,9 @@ import scala.concurrent.duration._
 /** JobManager actor for execution on Mesos. .
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executor Execution context which is used to execute concurrent 
tasks in the
+  * @param futureExecutor Execution context which is used to execute 
concurrent tasks in the
   *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param ioExecutor for blocking io operations
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
@@ -48,22 +49,25 @@ import scala.concurrent.duration._
   * @param timeout Timeout for futures
   * @param leaderElectionService LeaderElectionService to participate in the 
leader election
   */
-class MesosJobManager(flinkConfiguration: FlinkConfiguration,
-                      executor: Executor,
-                      instanceManager: InstanceManager,
-                      scheduler: FlinkScheduler,
-                      libraryCacheManager: BlobLibraryCacheManager,
-                      archive: ActorRef,
-                      restartStrategyFactory: RestartStrategyFactory,
-                      timeout: FiniteDuration,
-                      leaderElectionService: LeaderElectionService,
-                      submittedJobGraphs : SubmittedJobGraphStore,
-                      checkpointRecoveryFactory : CheckpointRecoveryFactory,
-                      jobRecoveryTimeout: FiniteDuration,
-                      metricsRegistry: Option[FlinkMetricRegistry])
+class MesosJobManager(
+    flinkConfiguration: FlinkConfiguration,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
+    instanceManager: InstanceManager,
+    scheduler: FlinkScheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    restartStrategyFactory: RestartStrategyFactory,
+    timeout: FiniteDuration,
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphs : SubmittedJobGraphStore,
+    checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    jobRecoveryTimeout: FiniteDuration,
+    metricsRegistry: Option[FlinkMetricRegistry])
   extends ContaineredJobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
index 3702eb4..97de89b 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
@@ -165,7 +165,7 @@ public class BackPressureStatsTracker {
                        if (!pendingStats.contains(vertex) &&
                                        
!vertex.getGraph().getState().isGloballyTerminalState()) {
 
-                               Executor executor = 
vertex.getGraph().getExecutor();
+                               Executor executor = 
vertex.getGraph().getFutureExecutor();
 
                                // Only trigger if still active job
                                if (executor != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
index 7ac2a69..c7e303d 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
@@ -66,7 +66,7 @@ public class BackPressureStatsTrackerTest {
                when(graph.getState()).thenReturn(JobStatus.RUNNING);
 
                // Same Thread execution context
-               when(graph.getExecutor()).thenReturn(new Executor() {
+               when(graph.getFutureExecutor()).thenReturn(new Executor() {
 
                        @Override
                        public void execute(Runnable runnable) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 886409d..638e0a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -150,6 +152,8 @@ public class CheckpointCoordinator {
        /** Default checkpoint properties **/
        private final CheckpointProperties checkpointProperties;
 
+       private final Executor executor;
+
        // 
--------------------------------------------------------------------------------------------
 
        public CheckpointCoordinator(
@@ -165,7 +169,8 @@ public class CheckpointCoordinator {
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore completedCheckpointStore,
                        String checkpointDirectory,
-                       CheckpointStatsTracker statsTracker) {
+                       CheckpointStatsTracker statsTracker,
+                       Executor executor) {
 
                // sanity checks
                checkArgument(baseInterval > 0, "Checkpoint timeout must be 
larger than zero");
@@ -216,6 +221,8 @@ public class CheckpointCoordinator {
                } catch (Throwable t) {
                        throw new RuntimeException("Failed to start checkpoint 
ID counter: " + t.getMessage(), t);
                }
+
+               this.executor = checkNotNull(executor);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -296,7 +303,7 @@ public class CheckpointCoordinator {
         * the checkpoint will be declined.
         * @return <code>true</code> if triggering the checkpoint succeeded.
         */
-       public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) 
throws Exception {
+       public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
                return triggerCheckpoint(timestamp, checkpointProperties, 
checkpointDirectory, isPeriodic).isSuccess();
        }
 
@@ -305,7 +312,7 @@ public class CheckpointCoordinator {
                        long timestamp,
                        CheckpointProperties props,
                        String targetDirectory,
-                       boolean isPeriodic) throws Exception {
+                       boolean isPeriodic) {
 
                // Sanity check
                if (props.externalizeCheckpoint() && targetDirectory == null) {
@@ -415,36 +422,32 @@ public class CheckpointCoordinator {
                        }
 
                        final PendingCheckpoint checkpoint = new 
PendingCheckpoint(
-                                       job,
-                                       checkpointID,
-                                       timestamp,
-                                       ackTasks,
-                                       isPeriodic,
-                                       props,
-                                       targetDirectory);
+                               job,
+                               checkpointID,
+                               timestamp,
+                               ackTasks,
+                               isPeriodic,
+                               props,
+                               targetDirectory,
+                               executor);
 
                        // schedule the timer that will clean up the expired 
checkpoints
                        TimerTask canceller = new TimerTask() {
                                @Override
                                public void run() {
-                                       try {
-                                               synchronized (lock) {
-                                                       // only do the work if 
the checkpoint is not discarded anyways
-                                                       // note that checkpoint 
completion discards the pending checkpoint object
-                                                       if 
(!checkpoint.isDiscarded()) {
-                                                               
LOG.info("Checkpoint " + checkpointID + " expired before completing.");
-
-                                                               
checkpoint.abortExpired();
-                                                               
pendingCheckpoints.remove(checkpointID);
-                                                               
rememberRecentCheckpointId(checkpointID);
-
-                                                               
triggerQueuedRequests();
-                                                       }
+                                       synchronized (lock) {
+                                               // only do the work if the 
checkpoint is not discarded anyways
+                                               // note that checkpoint 
completion discards the pending checkpoint object
+                                               if (!checkpoint.isDiscarded()) {
+                                                       LOG.info("Checkpoint " 
+ checkpointID + " expired before completing.");
+
+                                                       
checkpoint.abortExpired();
+                                                       
pendingCheckpoints.remove(checkpointID);
+                                                       
rememberRecentCheckpointId(checkpointID);
+
+                                                       triggerQueuedRequests();
                                                }
                                        }
-                                       catch (Throwable t) {
-                                               LOG.error("Exception while 
handling checkpoint timeout", t);
-                                       }
                                }
                        };
 
@@ -531,7 +534,7 @@ public class CheckpointCoordinator {
         *
         * @param message Checkpoint decline from the task manager
         */
-       public void receiveDeclineMessage(DeclineCheckpoint message) throws 
Exception {
+       public void receiveDeclineMessage(DeclineCheckpoint message) {
                if (shutdown || message == null) {
                        return;
                }
@@ -675,12 +678,8 @@ public class CheckpointCoordinator {
                                                                "the state 
handle to avoid lingering state.", message.getCheckpointId(),
                                                        
message.getTaskExecutionId(), message.getJob());
 
-                                               try {
-                                                       
message.getSubtaskState().discardState();
-                                               } catch (Exception e) {
-                                                       LOG.warn("Could not 
properly discard state for checkpoint {} of task {} of job {}.",
-                                                               
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
-                                               }
+                                               
discardState(message.getSubtaskState());
+
                                                break;
                                        case DISCARDED:
                                                LOG.warn("Could not acknowledge 
the checkpoint {} for task {} of job {}, " +
@@ -688,12 +687,7 @@ public class CheckpointCoordinator {
                                                                "state handle 
tp avoid lingering state.",
                                                        
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
 
-                                               try {
-                                                       
message.getSubtaskState().discardState();
-                                               } catch (Exception e) {
-                                                       LOG.warn("Could not 
properly discard state for checkpoint {} of task {} of job {}.",
-                                                               
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
-                                               }
+                                               
discardState(message.getSubtaskState());
                                }
                        }
                        else if (checkpoint != null) {
@@ -712,13 +706,8 @@ public class CheckpointCoordinator {
                                        isPendingCheckpoint = false;
                                }
 
-                               try {
-                                       // try to discard the state so that we 
don't have lingering state lying around
-                                       
message.getSubtaskState().discardState();
-                               } catch (Exception e) {
-                                       LOG.warn("Could not properly discard 
state for checkpoint {} of task {} of job {}.",
-                                               message.getCheckpointId(), 
message.getTaskExecutionId(), message.getJob(), e);
-                               }
+                               // try to discard the state so that we don't 
have lingering state lying around
+                               discardState(message.getSubtaskState());
                        }
                }
 
@@ -747,7 +736,7 @@ public class CheckpointCoordinator {
                recentPendingCheckpoints.addLast(id);
        }
 
-       private void dropSubsumedCheckpoints(long checkpointId) throws 
Exception {
+       private void dropSubsumedCheckpoints(long checkpointId) {
                Iterator<Map.Entry<Long, PendingCheckpoint>> entries = 
pendingCheckpoints.entrySet().iterator();
 
                while (entries.hasNext()) {
@@ -766,7 +755,7 @@ public class CheckpointCoordinator {
         *
         * <p>NOTE: The caller of this method must hold the lock when invoking 
the method!
         */
-       private void triggerQueuedRequests() throws Exception {
+       private void triggerQueuedRequests() {
                if (triggerRequestQueued) {
                        triggerRequestQueued = false;
 
@@ -915,11 +904,7 @@ public class CheckpointCoordinator {
                        }
 
                        for (PendingCheckpoint p : pendingCheckpoints.values()) 
{
-                               try {
-                                       p.abortError(new Exception("Checkpoint 
Coordinator is suspending."));
-                               } catch (Throwable t) {
-                                       LOG.error("Error while disposing 
pending checkpoint", t);
-                               }
+                               p.abortError(new Exception("Checkpoint 
Coordinator is suspending."));
                        }
 
                        pendingCheckpoints.clear();
@@ -959,4 +944,17 @@ public class CheckpointCoordinator {
                        }
                }
        }
+
+       private void discardState(final StateObject stateObject) {
+               executor.execute(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       stateObject.discardState();
+                               } catch (Exception e) {
+                                       LOG.warn("Could not properly discard 
state object.", e);
+                               }
+                       }
+               });
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 5034502..cfb59f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +39,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -84,6 +86,8 @@ public class PendingCheckpoint {
        /** The promise to fulfill once the checkpoint has been completed. */
        private final FlinkCompletableFuture<CompletedCheckpoint> 
onCompletionPromise = new FlinkCompletableFuture<>();
 
+       private final Executor executor;
+
        private int numAcknowledgedTasks;
 
        private boolean discarded;
@@ -97,7 +101,8 @@ public class PendingCheckpoint {
                        Map<ExecutionAttemptID, ExecutionVertex> 
verticesToConfirm,
                        boolean isPeriodic,
                        CheckpointProperties props,
-                       String targetDirectory) {
+                       String targetDirectory,
+                       Executor executor) {
                this.jobId = checkNotNull(jobId);
                this.checkpointId = checkpointId;
                this.checkpointTimestamp = checkpointTimestamp;
@@ -106,6 +111,7 @@ public class PendingCheckpoint {
                this.taskStates = new HashMap<>();
                this.props = checkNotNull(props);
                this.targetDirectory = targetDirectory;
+               this.executor = Preconditions.checkNotNull(executor);
 
                // Sanity check
                if (props.externalizeCheckpoint() && targetDirectory == null) {
@@ -324,7 +330,7 @@ public class PendingCheckpoint {
        /**
         * Aborts a checkpoint because it expired (took too long).
         */
-       public void abortExpired() throws Exception {
+       public void abortExpired() {
                try {
                        onCompletionPromise.completeExceptionally(new 
Exception("Checkpoint expired before completing"));
                } finally {
@@ -335,7 +341,7 @@ public class PendingCheckpoint {
        /**
         * Aborts the pending checkpoint because a newer completed checkpoint 
subsumed it.
         */
-       public void abortSubsumed() throws Exception {
+       public void abortSubsumed() {
                try {
                        if (props.forceCheckpoint()) {
                                onCompletionPromise.completeExceptionally(new 
Exception("Bug: forced checkpoints must never be subsumed"));
@@ -349,7 +355,7 @@ public class PendingCheckpoint {
                }
        }
 
-       public void abortDeclined() throws Exception {
+       public void abortDeclined() {
                try {
                        onCompletionPromise.completeExceptionally(new 
Exception("Checkpoint was declined (tasks not ready)"));
                } finally {
@@ -361,7 +367,7 @@ public class PendingCheckpoint {
         * Aborts the pending checkpoint due to an error.
         * @param cause The error's exception.
         */
-       public void abortError(Throwable cause) throws Exception {
+       public void abortError(Throwable cause) {
                try {
                        onCompletionPromise.completeExceptionally(new 
Exception("Checkpoint failed: " + cause.getMessage(), cause));
                } finally {
@@ -369,13 +375,24 @@ public class PendingCheckpoint {
                }
        }
 
-       private void dispose(boolean releaseState) throws Exception {
+       private void dispose(boolean releaseState) {
                synchronized (lock) {
                        try {
                                discarded = true;
                                numAcknowledgedTasks = -1;
                                if (releaseState) {
-                                       
StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
+                                       executor.execute(new Runnable() {
+                                               @Override
+                                               public void run() {
+                                                       try {
+                                                               
StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
+                                                       } catch (Exception e) {
+                                                               LOG.warn("Could 
not properly dispose the pending checkpoint " +
+                                                                       "{} of 
job {}.", checkpointId, jobId, e);
+                                                       }
+                                               }
+                                       });
+
                                }
                        } finally {
                                taskStates.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 1832d70..391f233 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -18,13 +18,20 @@
 
 package org.apache.flink.runtime.concurrent;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Collection of {@link Executor} implementations
  */
 public class Executors {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(Executors.class);
+
        /**
         * Return a direct executor. The direct executor directly executes the 
runnable in the calling
         * thread.
@@ -49,4 +56,46 @@ public class Executors {
                        command.run();
                }
        }
+
+       /**
+        * Gracefully shutdown the given {@link ExecutorService}. The call 
waits the given timeout that
+        * all ExecutorServices terminate. If the ExecutorServices do not 
terminate in this time,
+        * they will be shut down hard.
+        *
+        * @param timeout to wait for the termination of all ExecutorServices
+        * @param unit of the timeout
+        * @param executorServices to shut down
+        */
+       public static void gracefulShutdown(long timeout, TimeUnit unit, 
ExecutorService... executorServices) {
+               for (ExecutorService executorService: executorServices) {
+                       executorService.shutdown();
+               }
+
+               boolean wasInterrupted = false;
+               final long endTime = unit.toMillis(timeout) + 
System.currentTimeMillis();
+               long timeLeft = unit.toMillis(timeout);
+               boolean hasTimeLeft = timeLeft > 0L;
+
+               for (ExecutorService executorService: executorServices) {
+                       if (wasInterrupted || !hasTimeLeft) {
+                               executorService.shutdownNow();
+                       } else {
+                               try {
+                                       if 
(!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
+                                               LOG.warn("ExecutorService did 
not terminate in time. Shutting it down now.");
+                                               executorService.shutdownNow();
+                                       }
+                               } catch (InterruptedException e) {
+                                       LOG.warn("Interrupted while shutting 
down executor services. Shutting all " +
+                                               "remaining ExecutorServices 
down now.", e);
+                                       executorService.shutdownNow();
+
+                                       wasInterrupted = true;
+                               }
+
+                               timeLeft = endTime - System.currentTimeMillis();
+                               hasTimeLeft = timeLeft > 0L;
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f8e894a..cbb4c7e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -58,6 +58,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -203,7 +204,10 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        private CheckpointStatsTracker checkpointStatsTracker;
 
        /** The executor which is used to execute futures. */
-       private Executor executor;
+       private final Executor futureExecutor;
+
+       /** The executor which is used to execute blocking io operations */
+       private final Executor ioExecutor;
 
        /** Registered KvState instances reported by the TaskManagers. */
        private KvStateLocationRegistry kvStateLocationRegistry;
@@ -219,7 +223,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
         * This constructor is for tests only, because it does not include 
class loading information.
         */
        ExecutionGraph(
-                       Executor executor,
+                       Executor futureExecutor,
+                       Executor ioExecutor,
                        JobID jobId,
                        String jobName,
                        Configuration jobConfig,
@@ -227,7 +232,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        Time timeout,
                        RestartStrategy restartStrategy) throws IOException {
                this(
-                       executor,
+                       futureExecutor,
+                       ioExecutor,
                        jobId,
                        jobName,
                        jobConfig,
@@ -242,7 +248,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        }
 
        public ExecutionGraph(
-                       Executor executor,
+                       Executor futureExecutor,
+                       Executor ioExecutor,
                        JobID jobId,
                        String jobName,
                        Configuration jobConfig,
@@ -254,7 +261,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        ClassLoader userClassLoader,
                        MetricGroup metricGroup) throws IOException {
 
-               checkNotNull(executor);
+               checkNotNull(futureExecutor);
                checkNotNull(jobId);
                checkNotNull(jobName);
                checkNotNull(jobConfig);
@@ -271,7 +278,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                // serialize the job information to do the serialisation work 
only once
                this.serializedJobInformation = new 
SerializedValue<>(jobInformation);
 
-               this.executor = executor;
+               this.futureExecutor = 
Preconditions.checkNotNull(futureExecutor);
+               this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
 
                this.userClassLoader = userClassLoader;
 
@@ -365,19 +373,20 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
                // create the coordinator that triggers and commits checkpoints 
and holds the state
                checkpointCoordinator = new CheckpointCoordinator(
-                               jobInformation.getJobId(),
-                               interval,
-                               checkpointTimeout,
-                               minPauseBetweenCheckpoints,
-                               maxConcurrentCheckpoints,
-                               externalizeSettings,
-                               tasksToTrigger,
-                               tasksToWaitFor,
-                               tasksToCommitTo,
-                               checkpointIDCounter,
-                               checkpointStore,
-                               checkpointDir,
-                               checkpointStatsTracker);
+                       jobInformation.getJobId(),
+                       interval,
+                       checkpointTimeout,
+                       minPauseBetweenCheckpoints,
+                       maxConcurrentCheckpoints,
+                       externalizeSettings,
+                       tasksToTrigger,
+                       tasksToWaitFor,
+                       tasksToCommitTo,
+                       checkpointIDCounter,
+                       checkpointStore,
+                       checkpointDir,
+                       checkpointStatsTracker,
+                       ioExecutor);
 
                // interval of max long value indicates disable periodic 
checkpoint,
                // the CheckpointActivatorDeactivator should be created only if 
the interval is not max value
@@ -589,8 +598,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
         *
         * @return ExecutionContext associated with this ExecutionGraph
         */
-       public Executor getExecutor() {
-               return executor;
+       public Executor getFutureExecutor() {
+               return futureExecutor;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 3be1d56..a1d7385 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -53,7 +53,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * Utility class to encapsulate the logic of building an {@link 
ExecutionGraph} from a {@link JobGraph}.
  */
 public class ExecutionGraphBuilder {
-               /**
+       /**
         * Builds the ExecutionGraph from the JobGraph.
         * If a prior execution graph exists, the JobGraph will be attached. If 
no prior execution
         * graph exists, then the JobGraph will become attach to a new emoty 
execution graph.
@@ -62,7 +62,8 @@ public class ExecutionGraphBuilder {
                        @Nullable ExecutionGraph prior,
                        JobGraph jobGraph,
                        Configuration jobManagerConfig,
-                       Executor executor,
+                       Executor futureExecutor,
+                       Executor ioExecutor,
                        ClassLoader classLoader,
                        CheckpointRecoveryFactory recoveryFactory,
                        Time timeout,
@@ -83,17 +84,18 @@ public class ExecutionGraphBuilder {
                try {
                        executionGraph = (prior != null) ? prior :
                                        new ExecutionGraph(
-                                                       executor,
-                                                       jobId,
-                                                       jobName,
-                                                       
jobGraph.getJobConfiguration(),
-                                                       
jobGraph.getSerializedExecutionConfig(),
-                                                       timeout,
-                                                       restartStrategy,
-                                                       
jobGraph.getUserJarBlobKeys(),
-                                                       
jobGraph.getClasspaths(),
-                                                       classLoader,
-                                                       metrics);
+                                               futureExecutor,
+                                               ioExecutor,
+                                               jobId,
+                                               jobName,
+                                               jobGraph.getJobConfiguration(),
+                                               
jobGraph.getSerializedExecutionConfig(),
+                                               timeout,
+                                               restartStrategy,
+                                               jobGraph.getUserJarBlobKeys(),
+                                               jobGraph.getClasspaths(),
+                                               classLoader,
+                                               metrics);
                } catch (IOException e) {
                        throw new JobException("Could not create the execution 
graph.", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 01e8660..39c60d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -128,7 +128,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                this.priorExecutions = new CopyOnWriteArrayList<Execution>();
 
                this.currentExecution = new Execution(
-                       getExecutionGraph().getExecutor(),
+                       getExecutionGraph().getFutureExecutor(),
                        this,
                        0,
                        createTimestamp,
@@ -435,7 +435,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                        if (state == FINISHED || state == CANCELED || state == 
FAILED) {
                                priorExecutions.add(execution);
                                currentExecution = new Execution(
-                                       getExecutionGraph().getExecutor(),
+                                       getExecutionGraph().getFutureExecutor(),
                                        this,
                                        execution.getAttemptNumber()+1,
                                        System.currentTimeMillis(),

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index 3962e91..10546a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -70,7 +70,7 @@ public class FailureRateRestartStrategy implements 
RestartStrategy {
                        restartTimestampsDeque.remove();
                }
                restartTimestampsDeque.add(System.currentTimeMillis());
-               
FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph,
 delayInterval.toMilliseconds()), executionGraph.getExecutor());
+               
FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph,
 delayInterval.toMilliseconds()), executionGraph.getFutureExecutor());
        }
 
        private boolean isRestartTimestampsQueueFull() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index 5337c6a..f51ea7c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -58,7 +58,7 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
        @Override
        public void restart(final ExecutionGraph executionGraph) {
                currentRestartAttempt++;
-               
FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph,
 delayBetweenRestartAttempts), executionGraph.getExecutor());
+               
FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph,
 delayBetweenRestartAttempts), executionGraph.getFutureExecutor());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
index 0f31eba..cbe80f1 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -45,8 +45,9 @@ import scala.language.postfixOps
   * to start/administer/stop the session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executor Execution context which is used to execute concurrent 
tasks in the
+  * @param futureExecutor Execution context which is used to execute 
concurrent tasks in the
   *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param ioExecutor to execute blocking io operations
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
@@ -58,7 +59,8 @@ import scala.language.postfixOps
   */
 abstract class ContaineredJobManager(
     flinkConfiguration: Configuration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -72,7 +74,8 @@ abstract class ContaineredJobManager(
     metricsRegistry: Option[FlinkMetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 08ed0a4..197456f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -44,7 +44,7 @@ import 
org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.messages._
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{AcceptFunction, BiFunction}
+import org.apache.flink.runtime.concurrent.{AcceptFunction, BiFunction, 
Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -118,7 +118,8 @@ import scala.language.postfixOps
  */
 class JobManager(
     protected val flinkConfiguration: Configuration,
-    protected val executor: Executor,
+    protected val futureExecutor: Executor,
+    protected val ioExecutor: Executor,
     protected val instanceManager: InstanceManager,
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
@@ -1247,7 +1248,8 @@ class JobManager(
           executionGraph,
           jobGraph,
           flinkConfiguration,
-          executor,
+          futureExecutor,
+          ioExecutor,
           userCodeLoader,
           checkpointRecoveryFactory,
           Time.of(timeout.length, timeout.unit),
@@ -1976,8 +1978,9 @@ object JobManager {
 
     val ioExecutor = Executors.newFixedThreadPool(
       numberProcessors,
-      new NamedThreadFactory("jobmanager-io-", "-thread-")
-    )
+      new NamedThreadFactory("jobmanager-io-", "-thread-"))
+
+    val timeout = AkkaUtils.getTimeout(configuration)
 
     val (jobManagerSystem, _, _, webMonitorOption, _) = try {
       startActorSystemAndJobManagerActors(
@@ -1993,7 +1996,8 @@ object JobManager {
       )
     } catch {
       case t: Throwable =>
-          futureExecutor.shutdownNow()
+        futureExecutor.shutdownNow()
+        ioExecutor.shutdownNow()
 
         throw t
     }
@@ -2011,8 +2015,11 @@ object JobManager {
         }
     }
 
-    futureExecutor.shutdownNow()
-    ioExecutor.shutdownNow()
+    FlinkExecutors.gracefulShutdown(
+      timeout.toMillis,
+      TimeUnit.MILLISECONDS,
+      futureExecutor,
+      ioExecutor)
   }
 
   /**
@@ -2620,6 +2627,7 @@ object JobManager {
       jobManagerClass,
       configuration,
       futureExecutor,
+      ioExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,
@@ -2653,7 +2661,8 @@ object JobManager {
   def getJobManagerProps(
     jobManagerClass: Class[_ <: JobManager],
     configuration: Configuration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -2669,7 +2678,8 @@ object JobManager {
     Props(
       jobManagerClass,
       configuration,
-      executor,
+      futureExecutor,
+      ioExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 4367442..dc59048 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -20,16 +20,18 @@ package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
 import java.util.UUID
-import java.util.concurrent.{Executors, ForkJoinPool}
+import java.util.concurrent.{Executors, TimeUnit}
 
 import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.{JobExecutionResult, JobID, 
JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
+import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
@@ -409,8 +411,11 @@ abstract class FlinkMiniCluster(
     jobManagerLeaderRetrievalService.foreach(_.stop())
     isRunning = false
 
-    futureExecutor.shutdownNow()
-    ioExecutor.shutdownNow()
+    FlinkExecutors.gracefulShutdown(
+      timeout.toMillis,
+      TimeUnit.MILLISECONDS,
+      futureExecutor,
+      ioExecutor)
   }
 
   protected def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index b2aedf7..09deadc 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.minicluster
 
-import java.util.concurrent.ExecutorService
+import java.util.concurrent.{Executor, ExecutorService}
 
 import akka.actor.{ActorRef, ActorSystem, Props}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
-import org.apache.flink.configuration.{QueryableStateOptions, ConfigConstants, 
Configuration}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, 
QueryableStateOptions}
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
@@ -141,6 +141,7 @@ class LocalFlinkMiniCluster(
         jobManagerClass,
         config,
         futureExecutor,
+        ioExecutor,
         instanceManager,
         scheduler,
         libraryCacheManager,
@@ -242,25 +243,28 @@ class LocalFlinkMiniCluster(
   }
 
   def getJobManagerProps(
-    jobManagerClass: Class[_ <: JobManager],
-    configuration: Configuration,
-    executorService: ExecutorService,
-    instanceManager: InstanceManager,
-    scheduler: Scheduler,
-    libraryCacheManager: BlobLibraryCacheManager,
-    archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
-    timeout: FiniteDuration,
-    leaderElectionService: LeaderElectionService,
-    submittedJobGraphStore: SubmittedJobGraphStore,
-    checkpointRecoveryFactory: CheckpointRecoveryFactory,
-    jobRecoveryTimeout: FiniteDuration,
-    metricsRegistry: Option[MetricRegistry]): Props = {
+      jobManagerClass: Class[_ <: JobManager],
+      configuration: Configuration,
+      futureExecutor: Executor,
+      ioExecutor: Executor,
+      instanceManager: InstanceManager,
+      scheduler: Scheduler,
+      libraryCacheManager: BlobLibraryCacheManager,
+      archive: ActorRef,
+      restartStrategyFactory: RestartStrategyFactory,
+      timeout: FiniteDuration,
+      leaderElectionService: LeaderElectionService,
+      submittedJobGraphStore: SubmittedJobGraphStore,
+      checkpointRecoveryFactory: CheckpointRecoveryFactory,
+      jobRecoveryTimeout: FiniteDuration,
+      metricsRegistry: Option[MetricRegistry])
+    : Props = {
 
     JobManager.getJobManagerProps(
       jobManagerClass,
       configuration,
-      executorService,
+      futureExecutor,
+      ioExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index a59ffa2..8e46f4c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -116,19 +117,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       600000,
-                                       600000,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
-                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
-                                       new ExecutionVertex[] {},
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
+                               new ExecutionVertex[] { ackVertex1, ackVertex2 
},
+                               new ExecutionVertex[] {},
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -169,19 +171,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       600000,
-                                       600000,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
-                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
-                                       new ExecutionVertex[] {},
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
+                               new ExecutionVertex[] { ackVertex1, ackVertex2 
},
+                               new ExecutionVertex[] {},
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -220,19 +223,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       600000,
-                                       600000,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
-                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
-                                       new ExecutionVertex[] {},
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
+                               new ExecutionVertex[] { ackVertex1, ackVertex2 
},
+                               new ExecutionVertex[] {},
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -272,19 +276,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       600000,
-                                       600000,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { vertex1, 
vertex2 },
-                                       new ExecutionVertex[] { vertex1, 
vertex2 },
-                                       new ExecutionVertex[] { vertex1, 
vertex2 },
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -370,19 +375,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       600000,
-                                       600000,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { vertex1, 
vertex2 },
-                                       new ExecutionVertex[] { vertex1, 
vertex2 },
-                                       new ExecutionVertex[] { vertex1, 
vertex2 },
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -487,19 +493,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       600000,
-                                       600000,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { vertex1, 
vertex2 },
-                                       new ExecutionVertex[] { vertex1, 
vertex2 },
-                                       new ExecutionVertex[] { vertex1, 
vertex2 },
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -633,19 +640,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       600000,
-                                       600000,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
-                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2, ackVertex3 },
-                                       new ExecutionVertex[] { commitVertex },
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
+                               new ExecutionVertex[] { ackVertex1, ackVertex2, 
ackVertex3 },
+                               new ExecutionVertex[] { commitVertex },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(2),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -767,19 +775,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       600000,
-                                       600000,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
-                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2, ackVertex3 },
-                                       new ExecutionVertex[] { commitVertex },
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(10),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
+                               new ExecutionVertex[] { ackVertex1, ackVertex2, 
ackVertex3 },
+                               new ExecutionVertex[] { commitVertex },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(10),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -888,19 +897,20 @@ public class CheckpointCoordinatorTest {
                        // the timeout for the checkpoint is a 200 milliseconds
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       600000,
-                                       200,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { triggerVertex },
-                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
-                                       new ExecutionVertex[] { commitVertex },
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               600000,
+                               200,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { triggerVertex },
+                               new ExecutionVertex[] { ackVertex1, ackVertex2 
},
+                               new ExecutionVertex[] { commitVertex },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(2),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        // trigger a checkpoint, partially acknowledged
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -956,19 +966,20 @@ public class CheckpointCoordinatorTest {
                        ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       200000,
-                                       200000,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { triggerVertex },
-                                       new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
-                                       new ExecutionVertex[] { commitVertex },
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               200000,
+                               200000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { triggerVertex },
+                               new ExecutionVertex[] { ackVertex1, ackVertex2 
},
+                               new ExecutionVertex[] { commitVertex },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(2),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1033,7 +1044,8 @@ public class CheckpointCoordinatorTest {
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       new DisabledCheckpointStatsTracker());
+                       new DisabledCheckpointStatsTracker(),
+                       Executors.directExecutor());
 
                assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1145,19 +1157,20 @@ public class CheckpointCoordinatorTest {
                        }).when(execution).triggerCheckpoint(anyLong(), 
anyLong());
                        
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       10,        // periodic interval is 10 ms
-                                       200000,    // timeout is very long (200 
s)
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { triggerVertex },
-                                       new ExecutionVertex[] { ackVertex },
-                                       new ExecutionVertex[] { commitVertex },
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               10,        // periodic interval is 10 ms
+                               200000,    // timeout is very long (200 s)
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { triggerVertex },
+                               new ExecutionVertex[] { ackVertex },
+                               new ExecutionVertex[] { commitVertex },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(2),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        
                        coord.startCheckpointScheduler();
@@ -1236,19 +1249,20 @@ public class CheckpointCoordinatorTest {
                        }).when(execution).triggerCheckpoint(anyLong(), 
anyLong());
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       10,        // periodic interval is 10 ms
-                                       200000,    // timeout is very long (200 
s)
-                                       500,    // 500ms delay between 
checkpoints
-                                       10,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { vertex1 },
-                                       new ExecutionVertex[] { vertex1 },
-                                       new ExecutionVertex[] { vertex1 },
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               10,        // periodic interval is 10 ms
+                               200000,    // timeout is very long (200 s)
+                               500,    // 500ms delay between checkpoints
+                               10,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { vertex1 },
+                               new ExecutionVertex[] { vertex1 },
+                               new ExecutionVertex[] { vertex1 },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(2),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        coord.startCheckpointScheduler();
 
@@ -1321,19 +1335,20 @@ public class CheckpointCoordinatorTest {
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               0,
-                               Integer.MAX_VALUE,
-                               ExternalizedCheckpointSettings.none(),
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1),
-                               null,
-                               new DisabledCheckpointStatsTracker());
+                       jid,
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       ExternalizedCheckpointSettings.none(),
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       null,
+                       new DisabledCheckpointStatsTracker(),
+                       Executors.directExecutor());
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
                assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1456,19 +1471,20 @@ public class CheckpointCoordinatorTest {
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               0,
-                               Integer.MAX_VALUE,
-                               ExternalizedCheckpointSettings.none(),
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               new ExecutionVertex[] { vertex1, vertex2 },
-                               counter,
-                               new StandaloneCompletedCheckpointStore(10),
-                               null,
-                               new DisabledCheckpointStatsTracker());
+                       jid,
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       ExternalizedCheckpointSettings.none(),
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       counter,
+                       new StandaloneCompletedCheckpointStore(10),
+                       null,
+                       new DisabledCheckpointStatsTracker(),
+                       Executors.directExecutor());
 
                String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -1559,19 +1575,20 @@ public class CheckpointCoordinatorTest {
                        }).when(execution).notifyCheckpointComplete(anyLong(), 
anyLong());
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       10,        // periodic interval is 10 ms
-                                       200000,    // timeout is very long (200 
s)
-                                       0L,        // no extra delay
-                                       maxConcurrentAttempts,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { triggerVertex },
-                                       new ExecutionVertex[] { ackVertex },
-                                       new ExecutionVertex[] { commitVertex },
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               10,        // periodic interval is 10 ms
+                               200000,    // timeout is very long (200 s)
+                               0L,        // no extra delay
+                               maxConcurrentAttempts,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { triggerVertex },
+                               new ExecutionVertex[] { ackVertex },
+                               new ExecutionVertex[] { commitVertex },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(2),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        coord.startCheckpointScheduler();
 
@@ -1632,19 +1649,20 @@ public class CheckpointCoordinatorTest {
                        ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       10,        // periodic interval is 10 ms
-                                       200000,    // timeout is very long (200 
s)
-                                       0L,        // no extra delay
-                                       maxConcurrentAttempts, // max two 
concurrent checkpoints
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { triggerVertex },
-                                       new ExecutionVertex[] { ackVertex },
-                                       new ExecutionVertex[] { commitVertex },
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               10,        // periodic interval is 10 ms
+                               200000,    // timeout is very long (200 s)
+                               0L,        // no extra delay
+                               maxConcurrentAttempts, // max two concurrent 
checkpoints
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { triggerVertex },
+                               new ExecutionVertex[] { ackVertex },
+                               new ExecutionVertex[] { commitVertex },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(2),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        coord.startCheckpointScheduler();
 
@@ -1714,19 +1732,20 @@ public class CheckpointCoordinatorTest {
                                        });
                        
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       10,        // periodic interval is 10 ms
-                                       200000,    // timeout is very long (200 
s)
-                                       0L,        // no extra delay
-                                       2, // max two concurrent checkpoints
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { triggerVertex },
-                                       new ExecutionVertex[] { ackVertex },
-                                       new ExecutionVertex[] { commitVertex },
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               10,        // periodic interval is 10 ms
+                               200000,    // timeout is very long (200 s)
+                               0L,        // no extra delay
+                               2, // max two concurrent checkpoints
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { triggerVertex },
+                               new ExecutionVertex[] { ackVertex },
+                               new ExecutionVertex[] { commitVertex },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(2),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
                        
                        coord.startCheckpointScheduler();
 
@@ -1766,19 +1785,20 @@ public class CheckpointCoordinatorTest {
                StandaloneCheckpointIDCounter checkpointIDCounter = new 
StandaloneCheckpointIDCounter();
 
                CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jobId,
-                               100000,
-                               200000,
-                               0L,
-                               1, // max one checkpoint at a time => should 
not affect savepoints
-                               ExternalizedCheckpointSettings.none(),
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
-                               checkpointIDCounter,
-                               new StandaloneCompletedCheckpointStore(2),
-                               null,
-                               new DisabledCheckpointStatsTracker());
+                       jobId,
+                       100000,
+                       200000,
+                       0L,
+                       1, // max one checkpoint at a time => should not affect 
savepoints
+                       ExternalizedCheckpointSettings.none(),
+                       new ExecutionVertex[] { vertex1 },
+                       new ExecutionVertex[] { vertex1 },
+                       new ExecutionVertex[] { vertex1 },
+                       checkpointIDCounter,
+                       new StandaloneCompletedCheckpointStore(2),
+                       null,
+                       new DisabledCheckpointStatsTracker(),
+                       Executors.directExecutor());
 
                List<Future<CompletedCheckpoint>> savepointFutures = new 
ArrayList<>();
 
@@ -1819,19 +1839,20 @@ public class CheckpointCoordinatorTest {
                ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
 
                CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jobId,
-                               100000,
-                               200000,
-                               100000000L, // very long min delay => should 
not affect savepoints
-                               1,
-                               ExternalizedCheckpointSettings.none(),
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(2),
-                               null,
-                               new DisabledCheckpointStatsTracker());
+                       jobId,
+                       100000,
+                       200000,
+                       100000000L, // very long min delay => should not affect 
savepoints
+                       1,
+                       ExternalizedCheckpointSettings.none(),
+                       new ExecutionVertex[] { vertex1 },
+                       new ExecutionVertex[] { vertex1 },
+                       new ExecutionVertex[] { vertex1 },
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(2),
+                       null,
+                       new DisabledCheckpointStatsTracker(),
+                       Executors.directExecutor());
 
                String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -1879,19 +1900,20 @@ public class CheckpointCoordinatorTest {
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               0,
-                               Integer.MAX_VALUE,
-                               ExternalizedCheckpointSettings.none(),
-                               arrayExecutionVertices,
-                               arrayExecutionVertices,
-                               arrayExecutionVertices,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1),
-                               null,
-                               new DisabledCheckpointStatsTracker());
+                       jid,
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       ExternalizedCheckpointSettings.none(),
+                       arrayExecutionVertices,
+                       arrayExecutionVertices,
+                       arrayExecutionVertices,
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       null,
+                       new DisabledCheckpointStatsTracker(),
+                       Executors.directExecutor());
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -1984,19 +2006,20 @@ public class CheckpointCoordinatorTest {
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               0,
-                               Integer.MAX_VALUE,
-                               ExternalizedCheckpointSettings.none(),
-                               arrayExecutionVertices,
-                               arrayExecutionVertices,
-                               arrayExecutionVertices,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1),
-                               null,
-                               new DisabledCheckpointStatsTracker());
+                       jid,
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       ExternalizedCheckpointSettings.none(),
+                       arrayExecutionVertices,
+                       arrayExecutionVertices,
+                       arrayExecutionVertices,
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       null,
+                       new DisabledCheckpointStatsTracker(),
+                       Executors.directExecutor());
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2099,19 +2122,20 @@ public class CheckpointCoordinatorTest {
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               0,
-                               Integer.MAX_VALUE,
-                               ExternalizedCheckpointSettings.none(),
-                               arrayExecutionVertices,
-                               arrayExecutionVertices,
-                               arrayExecutionVertices,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1),
-                               null,
-                               new DisabledCheckpointStatsTracker());
+                       jid,
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       ExternalizedCheckpointSettings.none(),
+                       arrayExecutionVertices,
+                       arrayExecutionVertices,
+                       arrayExecutionVertices,
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       null,
+                       new DisabledCheckpointStatsTracker(),
+                       Executors.directExecutor());
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2234,19 +2258,20 @@ public class CheckpointCoordinatorTest {
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               0,
-                               Integer.MAX_VALUE,
-                               ExternalizedCheckpointSettings.none(),
-                               arrayExecutionVertices,
-                               arrayExecutionVertices,
-                               arrayExecutionVertices,
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1),
-                               null,
-                               new DisabledCheckpointStatsTracker());
+                       jid,
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       ExternalizedCheckpointSettings.none(),
+                       arrayExecutionVertices,
+                       arrayExecutionVertices,
+                       arrayExecutionVertices,
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       null,
+                       new DisabledCheckpointStatsTracker(),
+                       Executors.directExecutor());
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2365,19 +2390,20 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       600000,
-                                       600000,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       
ExternalizedCheckpointSettings.externalizeCheckpoints(true),
-                                       new ExecutionVertex[] { vertex1 },
-                                       new ExecutionVertex[] { vertex1 },
-                                       new ExecutionVertex[] { vertex1 },
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1),
-                                       "fake-directory",
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               
ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+                               new ExecutionVertex[] { vertex1 },
+                               new ExecutionVertex[] { vertex1 },
+                               new ExecutionVertex[] { vertex1 },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               "fake-directory",
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -2741,19 +2767,20 @@ public class CheckpointCoordinatorTest {
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
-                               new JobID(),
-                               600000,
-                               600000,
-                               0,
-                               Integer.MAX_VALUE,
-                               ExternalizedCheckpointSettings.none(),
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1),
-                               null,
-                               new DisabledCheckpointStatsTracker());
+                       new JobID(),
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       ExternalizedCheckpointSettings.none(),
+                       new ExecutionVertex[] { vertex1 },
+                       new ExecutionVertex[] { vertex1 },
+                       new ExecutionVertex[] { vertex1 },
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       null,
+                       new DisabledCheckpointStatsTracker(),
+                       Executors.directExecutor());
 
                // Periodic
                CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 6e5279b..7cea130 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -40,7 +41,6 @@ import org.hamcrest.Description;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -97,19 +97,20 @@ public class CheckpointStateRestoreTest {
                        map.put(statelessId, stateless);
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid,
-                                       200000L,
-                                       200000L,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
-                                       new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
-                                       new ExecutionVertex[0],
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               200000L,
+                               200000L,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { stateful1, stateful2, 
stateful3, stateless1, stateless2 },
+                               new ExecutionVertex[] { stateful1, stateful2, 
stateful3, stateless1, stateless2 },
+                               new ExecutionVertex[0],
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        // create ourselves a checkpoint with state
                        final long timestamp = 34623786L;
@@ -172,19 +173,20 @@ public class CheckpointStateRestoreTest {
        public void testNoCheckpointAvailable() {
                try {
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       new JobID(),
-                                       200000L,
-                                       200000L,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       ExternalizedCheckpointSettings.none(),
-                                       new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
-                                       new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
-                                       new ExecutionVertex[0],
-                                       new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1),
-                                       null,
-                                       new DisabledCheckpointStatsTracker());
+                               new JobID(),
+                               200000L,
+                               200000L,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
+                               new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
+                               new ExecutionVertex[0],
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               new DisabledCheckpointStatsTracker(),
+                               Executors.directExecutor());
 
                        try {
                                coord.restoreLatestCheckpointedState(new 
HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
@@ -227,19 +229,20 @@ public class CheckpointStateRestoreTest {
                tasks.put(jobVertexId2, jobVertex2);
 
                CheckpointCoordinator coord = new CheckpointCoordinator(
-                               new JobID(),
-                               Integer.MAX_VALUE,
-                               Integer.MAX_VALUE,
-                               0,
-                               Integer.MAX_VALUE,
-                               ExternalizedCheckpointSettings.none(),
-                               new ExecutionVertex[] {},
-                               new ExecutionVertex[] {},
-                               new ExecutionVertex[] {},
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1),
-                               null,
-                               new DisabledCheckpointStatsTracker());
+                       new JobID(),
+                       Integer.MAX_VALUE,
+                       Integer.MAX_VALUE,
+                       0,
+                       Integer.MAX_VALUE,
+                       ExternalizedCheckpointSettings.none(),
+                       new ExecutionVertex[] {},
+                       new ExecutionVertex[] {},
+                       new ExecutionVertex[] {},
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       null,
+                       new DisabledCheckpointStatsTracker(),
+                       Executors.directExecutor());
 
                ChainedStateHandle<StreamStateHandle> serializedState = 
CheckpointCoordinatorTest
                                .generateChainedStateHandle(new 
SerializableObject());

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index c8c9350..c6c7ae5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -97,6 +97,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                        CompletedCheckpointStore store) throws Exception {
                ExecutionGraph executionGraph = new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutionContext(),
                        new JobID(),
                        "test",
                        new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 84f0e1f..e918965 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -210,7 +211,15 @@ public class PendingCheckpointTest {
 
        private static PendingCheckpoint 
createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
                Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new 
HashMap<>(ACK_TASKS);
-               return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, 
false, props, targetDirectory);
+               return new PendingCheckpoint(
+                       new JobID(),
+                       0,
+                       1,
+                       ackTasks,
+                       false,
+                       props,
+                       targetDirectory,
+                       Executors.directExecutor());
        }
 
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/c590912c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index 34043eb..0223a2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -50,7 +50,7 @@ public class AllVerticesIteratorTest {
                        v4.setParallelism(2);
                        
                        ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
-                       
Mockito.when(eg.getExecutor()).thenReturn(TestingUtils.directExecutionContext());
+                       
Mockito.when(eg.getFutureExecutor()).thenReturn(TestingUtils.directExecutionContext());
                                        
                        ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, 
v1, 1,
                                        AkkaUtils.getDefaultTimeout());

Reply via email to