http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index b94f904..abc59cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,26 +18,19 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
@@ -48,9 +41,10 @@ import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -59,16 +53,11 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
-import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
-import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -84,22 +73,26 @@ import 
org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -110,16 +103,21 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * It offers the following methods as part of its rpc interface to interact 
with the JobMaster
  * remotely:
  * <ul>
- * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task 
execution state for
+ * <li>{@link #updateTaskExecutionState} updates the task execution state for
  * given task</li>
  * </ul>
  */
 public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
+       private static final AtomicReferenceFieldUpdater<JobMaster, UUID> 
LEADER_ID_UPDATER =
+                       AtomicReferenceFieldUpdater.newUpdater(JobMaster.class, 
UUID.class, "leaderSessionID");
+
+       // 
------------------------------------------------------------------------
+
        /** Logical representation of the job */
        private final JobGraph jobGraph;
 
-       /** Configuration of the job */
+       /** Configuration of the JobManager */
        private final Configuration configuration;
 
        /** Service to contend for and retrieve the leadership of JM and RM */
@@ -128,37 +126,24 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        /** Blob cache manager used across jobs */
        private final BlobLibraryCacheManager libraryCacheManager;
 
-       /** Factory to create restart strategy for this job */
-       private final RestartStrategyFactory restartStrategyFactory;
-
-       /** Store for save points */
-       private final SavepointStore savepointStore;
-
-       /** The timeout for this job */
-       private final Time timeout;
-
-       /** The scheduler to use for scheduling new tasks as they are needed */
-       private final Scheduler scheduler;
+       /** The metrics for the JobManager itself */
+       private final MetricGroup jobManagerMetricGroup;
 
-       /** The metrics group used across jobs */
-       private final JobManagerMetricGroup jobManagerMetricGroup;
+       /** The metrics for the job */
+       private final MetricGroup jobMetricGroup;
 
        /** The execution context which is used to execute futures */
-       private final Executor executionContext;
+       private final ExecutorService executionContext;
 
        private final OnCompletionActions jobCompletionActions;
 
-       /** The execution graph of this job */
-       private volatile ExecutionGraph executionGraph;
-
-       /** The checkpoint recovery factory used by this job */
-       private CheckpointRecoveryFactory checkpointRecoveryFactory;
+       private final FatalErrorHandler errorHandler;
 
-       private ClassLoader userCodeLoader;
+       private final ClassLoader userCodeLoader;
 
-       private RestartStrategy restartStrategy;
+       /** The execution graph of this job */
+       private final ExecutionGraph executionGraph;
 
-       private MetricGroup jobMetrics;
 
        private volatile UUID leaderSessionID;
 
@@ -168,22 +153,26 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        private LeaderRetrievalService resourceManagerLeaderRetriever;
 
        /** Connection with ResourceManager, null if not located address yet or 
we close it initiative */
-       private volatile ResourceManagerConnection resourceManagerConnection;
+       private ResourceManagerConnection resourceManagerConnection;
+
+       // TODO - we need to replace this with the slot pool
+       private final Scheduler scheduler;
 
        // 
------------------------------------------------------------------------
 
        public JobMaster(
-               JobGraph jobGraph,
-               Configuration configuration,
-               RpcService rpcService,
-               HighAvailabilityServices highAvailabilityService,
-               BlobLibraryCacheManager libraryCacheManager,
-               RestartStrategyFactory restartStrategyFactory,
-               SavepointStore savepointStore,
-               Time timeout,
-               Scheduler scheduler,
-               JobManagerMetricGroup jobManagerMetricGroup,
-               OnCompletionActions jobCompletionActions)
+                       JobGraph jobGraph,
+                       Configuration configuration,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityService,
+                       ExecutorService executorService,
+                       BlobLibraryCacheManager libraryCacheManager,
+                       RestartStrategyFactory restartStrategyFactory,
+                       Time rpcAskTimeout,
+                       @Nullable JobManagerMetricGroup jobManagerMetricGroup,
+                       OnCompletionActions jobCompletionActions,
+                       FatalErrorHandler errorHandler,
+                       ClassLoader userCodeLoader) throws Exception
        {
                super(rpcService);
 
@@ -191,293 +180,150 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                this.configuration = checkNotNull(configuration);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityService);
                this.libraryCacheManager = checkNotNull(libraryCacheManager);
-               this.restartStrategyFactory = 
checkNotNull(restartStrategyFactory);
-               this.savepointStore = checkNotNull(savepointStore);
-               this.timeout = checkNotNull(timeout);
-               this.scheduler = checkNotNull(scheduler);
-               this.jobManagerMetricGroup = 
checkNotNull(jobManagerMetricGroup);
-               this.executionContext = checkNotNull(rpcService.getExecutor());
+               this.executionContext = checkNotNull(executorService);
                this.jobCompletionActions = checkNotNull(jobCompletionActions);
-       }
-
-       
//----------------------------------------------------------------------------------------------
-       // Lifecycle management
-       
//----------------------------------------------------------------------------------------------
+               this.errorHandler = checkNotNull(errorHandler);
+               this.userCodeLoader = checkNotNull(userCodeLoader);
 
-       /**
-        * Initializing the job execution environment, should be called before 
start. Any error occurred during
-        * initialization will be treated as job submission failure.
-        *
-        * @throws JobSubmissionException
-        */
-       public void init() throws JobSubmissionException {
-               log.info("Initializing job {} ({}).", jobGraph.getName(), 
jobGraph.getJobID());
+               final String jobName = jobGraph.getName();
+               final JobID jid = jobGraph.getJobID();
 
-               try {
-                       // IMPORTANT: We need to make sure that the library 
registration is the first action,
-                       // because this makes sure that the uploaded jar files 
are removed in case of
-                       // unsuccessful
-                       try {
-                               
libraryCacheManager.registerJob(jobGraph.getJobID(), 
jobGraph.getUserJarBlobKeys(),
-                                       jobGraph.getClasspaths());
-                       } catch (Throwable t) {
-                               throw new 
JobSubmissionException(jobGraph.getJobID(),
-                                       "Cannot set up the user code libraries: 
" + t.getMessage(), t);
-                       }
+               if (jobManagerMetricGroup != null) {
+                       this.jobManagerMetricGroup = jobManagerMetricGroup;
+                       this.jobMetricGroup = 
jobManagerMetricGroup.addJob(jobGraph);
+               } else {
+                       this.jobManagerMetricGroup = new 
UnregisteredMetricsGroup();
+                       this.jobMetricGroup = new UnregisteredMetricsGroup();
+               }
 
-                       userCodeLoader = 
libraryCacheManager.getClassLoader(jobGraph.getJobID());
-                       if (userCodeLoader == null) {
-                               throw new 
JobSubmissionException(jobGraph.getJobID(),
-                                       "The user code class loader could not 
be initialized.");
-                       }
+               log.info("Initializing job {} ({}).", jobName, jid);
 
-                       if (jobGraph.getNumberOfVertices() == 0) {
-                               throw new 
JobSubmissionException(jobGraph.getJobID(), "The given job is empty");
-                       }
-
-                       final RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =
+               final RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =
                                jobGraph.getSerializedExecutionConfig()
-                                       .deserializeValue(userCodeLoader)
-                                       .getRestartStrategy();
-                       if (restartStrategyConfiguration != null) {
-                               restartStrategy = 
RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration);
-                       }
-                       else {
-                               restartStrategy = 
restartStrategyFactory.createRestartStrategy();
-                       }
+                                               
.deserializeValue(userCodeLoader)
+                                               .getRestartStrategy();
 
-                       log.info("Using restart strategy {} for {} ({}).", 
restartStrategy, jobGraph.getName(), jobGraph.getJobID());
+               final RestartStrategy restartStrategy = 
(restartStrategyConfiguration != null) ?
+                               
RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) :
+                               restartStrategyFactory.createRestartStrategy();
 
-                       if (jobManagerMetricGroup != null) {
-                               jobMetrics = 
jobManagerMetricGroup.addJob(jobGraph);
-                       }
-                       if (jobMetrics == null) {
-                               jobMetrics = new UnregisteredMetricsGroup();
-                       }
+               log.info("Using restart strategy {} for {} ({}).", 
restartStrategy, jobName, jid);
 
-                       try {
-                               checkpointRecoveryFactory = 
highAvailabilityServices.getCheckpointRecoveryFactory();
-                       } catch (Exception e) {
-                               log.error("Could not get the checkpoint 
recovery factory.", e);
-                               throw new 
JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint 
recovery factory.", e);
-                       }
+               CheckpointRecoveryFactory checkpointRecoveryFactory;
+               try {
+                       checkpointRecoveryFactory = 
highAvailabilityServices.getCheckpointRecoveryFactory();
+               } catch (Exception e) {
+                       log.error("Could not create the access to 
highly-available checkpoint storage.", e);
+                       throw new Exception("Could not create the access to 
highly-available checkpoint storage.", e);
+               }
 
-                       try {
-                               resourceManagerLeaderRetriever = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
-                       } catch (Exception e) {
-                               log.error("Could not get the resource manager 
leader retriever.", e);
-                               throw new 
JobSubmissionException(jobGraph.getJobID(),
+               try {
+                       resourceManagerLeaderRetriever = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
+               } catch (Exception e) {
+                       log.error("Could not get the resource manager leader 
retriever.", e);
+                       throw new JobSubmissionException(jobGraph.getJobID(),
                                        "Could not get the resource manager 
leader retriever.", e);
-                       }
-               } catch (Throwable t) {
-                       log.error("Failed to initializing job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), t);
-
-                       libraryCacheManager.unregisterJob(jobGraph.getJobID());
-
-                       if (t instanceof JobSubmissionException) {
-                               throw (JobSubmissionException) t;
-                       }
-                       else {
-                               throw new 
JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " +
-                                       jobGraph.getName() + " (" + 
jobGraph.getJobID() + ")", t);
-                       }
                }
+
+               this.executionGraph = ExecutionGraphBuilder.buildGraph(
+                               null,
+                               jobGraph,
+                               configuration,
+                               executorService,
+                               executorService,
+                               userCodeLoader,
+                               checkpointRecoveryFactory,
+                               rpcAskTimeout,
+                               restartStrategy,
+                               jobMetricGroup,
+                               -1,
+                               log);
+
+               // TODO - temp fix
+               this.scheduler = new Scheduler(executorService);
        }
 
+       
//----------------------------------------------------------------------------------------------
+       // Lifecycle management
+       
//----------------------------------------------------------------------------------------------
+
+
        @Override
        public void start() {
-               super.start();
+               throw new UnsupportedOperationException("Should never call 
start() without leader ID");
        }
 
+       /**
+        * Start the rpc service and begin to run the job.
+        *
+        * @param leaderSessionID The necessary leader id for running the job.
+        */
+       public void start(final UUID leaderSessionID) {
+               if (LEADER_ID_UPDATER.compareAndSet(this, null, 
leaderSessionID)) {
+                       super.start();
+
+                       log.info("Starting JobManager for job {} ({})", 
jobGraph.getName(), jobGraph.getJobID());
+                       getSelf().startJobExecution();
+               } else {
+                       log.warn("Job already started with leaderId {}, 
ignoring this start request.", leaderSessionID);
+               }
+       }
+
+       /**
+        * Suspend the job and shutdown all other services including rpc.
+        */
        @Override
        public void shutDown() {
+               // make sure there is a graceful exit
+               getSelf().suspendExecution(new Exception("JobManager is 
shutting down."));
                super.shutDown();
-
-               suspendJob(new Exception("JobManager is shutting down."));
-
-               disposeCommunicationWithResourceManager();
        }
 
-
-
        
//----------------------------------------------------------------------------------------------
        // RPC methods
        
//----------------------------------------------------------------------------------------------
 
-       /**
-        * Start to run the job, runtime data structures like ExecutionGraph 
will be constructed now and checkpoint
-        * being recovered. After this, we will begin to schedule the job.
-        */
-       @RpcMethod
-       public void startJob(final UUID leaderSessionID) {
-               log.info("Starting job {} ({}) with leaderId {}.", 
jobGraph.getName(), jobGraph.getJobID(), leaderSessionID);
+       //-- job starting and stopping  
-----------------------------------------------------------------
 
-               this.leaderSessionID = leaderSessionID;
+       @RpcMethod
+       public void startJobExecution() {
+               log.info("Starting execution of job {} ({}) with leaderId {}.",
+                               jobGraph.getName(), jobGraph.getJobID(), 
leaderSessionID);
 
                try {
-                       if (executionGraph != null) {
-                               executionGraph = new ExecutionGraph(
-                                               executionContext,
-                                               executionContext,
-                                               jobGraph.getJobID(),
-                                               jobGraph.getName(),
-                                               jobGraph.getJobConfiguration(),
-                                               
jobGraph.getSerializedExecutionConfig(),
-                                               timeout,
-                                               restartStrategy,
-                                               jobGraph.getUserJarBlobKeys(),
-                                               jobGraph.getClasspaths(),
-                                               userCodeLoader,
-                                               jobMetrics);
-                       } else {
-                               // TODO: update last active time in JobInfo
-                       }
-
-                       
executionGraph.setScheduleMode(jobGraph.getScheduleMode());
-                       
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
-
-                       try {
-                               
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
-                       } catch (Exception e) {
-                               log.warn("Cannot create JSON plan for job {} 
({})", jobGraph.getJobID(), jobGraph.getName(), e);
-                               executionGraph.setJsonPlan("{}");
-                       }
-
-                       // initialize the vertices that have a master 
initialization hook
-                       // file output formats create directories here, input 
formats create splits
-                       if (log.isDebugEnabled()) {
-                               log.debug("Running initialization on master for 
job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
-                       }
-                       for (JobVertex vertex : jobGraph.getVertices()) {
-                               final String executableClass = 
vertex.getInvokableClassName();
-                               if (executableClass == null || 
executableClass.length() == 0) {
-                                       throw new 
JobExecutionException(jobGraph.getJobID(),
-                                               "The vertex " + vertex.getID() 
+ " (" + vertex.getName() + ") has no invokable class.");
-                               }
-                               if (vertex.getParallelism() == 
ExecutionConfig.PARALLELISM_AUTO_MAX) {
-                                       
vertex.setParallelism(scheduler.getTotalNumberOfSlots());
-                               }
-
-                               try {
-                                       
vertex.initializeOnMaster(userCodeLoader);
-                               } catch (Throwable t) {
-                                       throw new 
JobExecutionException(jobGraph.getJobID(),
-                                               "Cannot initialize task '" + 
vertex.getName() + "': " + t.getMessage(), t);
-                               }
-                       }
-
-                       // topologically sort the job vertices and attach the 
graph to the existing one
-                       final List<JobVertex> sortedTopology = 
jobGraph.getVerticesSortedTopologicallyFromSources();
-                       if (log.isDebugEnabled()) {
-                               log.debug("Adding {} vertices from job graph {} 
({}).", sortedTopology.size(),
-                                       jobGraph.getJobID(), 
jobGraph.getName());
-                       }
-                       executionGraph.attachJobGraph(sortedTopology);
-
-                       if (log.isDebugEnabled()) {
-                               log.debug("Successfully created execution graph 
from job graph {} ({}).",
-                                       jobGraph.getJobID(), 
jobGraph.getName());
-                       }
-
-                       final JobSnapshottingSettings snapshotSettings = 
jobGraph.getSnapshotSettings();
-                       if (snapshotSettings != null) {
-                               List<ExecutionJobVertex> triggerVertices = 
getExecutionJobVertexWithId(
-                                       executionGraph, 
snapshotSettings.getVerticesToTrigger());
-
-                               List<ExecutionJobVertex> ackVertices = 
getExecutionJobVertexWithId(
-                                       executionGraph, 
snapshotSettings.getVerticesToAcknowledge());
-
-                               List<ExecutionJobVertex> confirmVertices = 
getExecutionJobVertexWithId(
-                                       executionGraph, 
snapshotSettings.getVerticesToConfirm());
-
-                               CompletedCheckpointStore completedCheckpoints = 
checkpointRecoveryFactory.createCheckpointStore(
-                                       jobGraph.getJobID(), userCodeLoader);
-
-                               CheckpointIDCounter checkpointIdCounter = 
checkpointRecoveryFactory.createCheckpointIDCounter(
-                                       jobGraph.getJobID());
-
-                               // Checkpoint stats tracker
-                               boolean isStatsDisabled = 
configuration.getBoolean(
-                                       
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
-                                       
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
-
-                               final CheckpointStatsTracker 
checkpointStatsTracker;
-                               if (isStatsDisabled) {
-                                       checkpointStatsTracker = new 
DisabledCheckpointStatsTracker();
-                               }
-                               else {
-                                       int historySize = 
configuration.getInteger(
-                                               
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
-                                               
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
-                                       checkpointStatsTracker = new 
SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics);
-                               }
-
-                               String externalizedCheckpointsDir = 
configuration.getString(
-                                               
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null);
-
-                               executionGraph.enableSnapshotCheckpointing(
-                                       
snapshotSettings.getCheckpointInterval(),
-                                       snapshotSettings.getCheckpointTimeout(),
-                                       
snapshotSettings.getMinPauseBetweenCheckpoints(),
-                                       
snapshotSettings.getMaxConcurrentCheckpoints(),
-                                       
snapshotSettings.getExternalizedCheckpointSettings(),
-                                       triggerVertices,
-                                       ackVertices,
-                                       confirmVertices,
-                                       checkpointIdCounter,
-                                       completedCheckpoints,
-                                       externalizedCheckpointsDir,
-                                       checkpointStatsTracker);
-                       }
-
-                       // TODO: register this class to execution graph as job 
status change listeners
-
-                       // TODO: register client as job / execution status 
change listeners if they are interested
-
-                       /*
-                       TODO: decide whether we should take the savepoint 
before recovery
-
-                       if (isRecovery) {
-                               // this is a recovery of a master failure (this 
master takes over)
-                               executionGraph.restoreLatestCheckpointedState();
-                       } else {
-                               if (snapshotSettings != null) {
-                                       String savepointPath = 
snapshotSettings.getSavepointPath();
-                                       if (savepointPath != null) {
-                                               // got a savepoint
-                                               log.info("Starting job from 
savepoint {}.", savepointPath);
-
-                                               // load the savepoint as a 
checkpoint into the system
-                                               final CompletedCheckpoint 
savepoint = SavepointLoader.loadAndValidateSavepoint(
-                                                       jobGraph.getJobID(), 
executionGraph.getAllVertices(), savepointStore, savepointPath);
-                                               
executionGraph.getCheckpointCoordinator().getCheckpointStore().addCheckpoint(savepoint);
-
-                                               // Reset the checkpoint ID 
counter
-                                               long nextCheckpointId = 
savepoint.getCheckpointID() + 1;
-                                               log.info("Reset the checkpoint 
ID to " + nextCheckpointId);
-                                               
executionGraph.getCheckpointCoordinator().getCheckpointIdCounter().setCount(nextCheckpointId);
-
-                                               
executionGraph.restoreLatestCheckpointedState();
-                                       }
+                       // register self as job status change listener
+                       executionGraph.registerJobStatusListener(new 
JobStatusListener() {
+                               @Override
+                               public void jobStatusChanges(
+                                               final JobID jobId, final 
JobStatus newJobStatus, final long timestamp, final Throwable error)
+                               {
+                                       // run in rpc thread to avoid 
concurrency
+                                       runAsync(new Runnable() {
+                                               @Override
+                                               public void run() {
+                                                       
jobStatusChanged(newJobStatus, timestamp, error);
+                                               }
+                                       });
                                }
-                       }
-                       */
+                       });
 
-                       // job is good to go, try to locate resource manager's 
address
+                       // job is ready to go, try to establish connection with 
resource manager
                        resourceManagerLeaderRetriever.start(new 
ResourceManagerLeaderListener());
                } catch (Throwable t) {
+
+                       // TODO - this should not result in a job failure, but 
another leader should take over
+                       // TODO - either this master should retry the 
execution, or it should relinquish leadership / terminate
+
                        log.error("Failed to start job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), t);
 
                        executionGraph.fail(t);
-                       executionGraph = null;
 
-                       final Throwable rt;
+                       final JobExecutionException rt;
                        if (t instanceof JobExecutionException) {
                                rt = (JobExecutionException) t;
-                       }
-                       else {
+                       } else {
                                rt = new 
JobExecutionException(jobGraph.getJobID(),
-                                       "Failed to start job " + 
jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
+                                               "Failed to start job " + 
jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
                        }
 
                        // TODO: notify client about this failure
@@ -490,34 +336,51 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                executionContext.execute(new Runnable() {
                        @Override
                        public void run() {
-                               if (executionGraph != null) {
-                                       try {
-                                               
executionGraph.scheduleForExecution(scheduler);
-                                       } catch (Throwable t) {
-                                               executionGraph.fail(t);
-                                       }
+                               try {
+                                       
executionGraph.scheduleForExecution(scheduler);
+                               } catch (Throwable t) {
+                                       executionGraph.fail(t);
                                }
                        }
                });
        }
 
        /**
-        * Suspending job, all the running tasks will be cancelled, and runtime 
data will be cleared.
+        * Suspending job, all the running tasks will be cancelled, and 
communication with other components
+        * will be disposed.
+        *
+        * <p>Mostly job is suspended because of the leadership has been 
revoked, one can be restart this job by
+        * calling the {@link #start(UUID)} method once we take the leadership 
back again.
         *
         * @param cause The reason of why this job been suspended.
         */
        @RpcMethod
-       public void suspendJob(final Throwable cause) {
+       public void suspendExecution(final Throwable cause) {
+               if (leaderSessionID == null) {
+                       log.debug("Job has already been suspended or 
shutdown.");
+                       return;
+               }
+
+               // receive no more messages until started again, should be 
called before we clear self leader id
+               ((StartStoppable) getSelf()).stop();
+
                leaderSessionID = null;
+               executionGraph.suspend(cause);
 
-               if (executionGraph != null) {
-                       executionGraph.suspend(cause);
-                       executionGraph = null;
+               // disconnect from resource manager:
+               try {
+                       resourceManagerLeaderRetriever.stop();
+               } catch (Exception e) {
+                       log.warn("Failed to stop resource manager leader 
retriever when suspending.");
                }
+               closeResourceManagerConnection();
+
+               // TODO: disconnect from all registered task managers
 
-               disposeCommunicationWithResourceManager();
        }
 
+       
//----------------------------------------------------------------------------------------------
+
        /**
         * Updates the task execution state for a given task.
         *
@@ -525,26 +388,38 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         * @return Acknowledge the task execution state update
         */
        @RpcMethod
-       public Acknowledge updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) throws ExecutionGraphException {
+       public Acknowledge updateTaskExecutionState(
+                       final UUID leaderSessionID,
+                       final TaskExecutionState taskExecutionState) throws 
Exception
+       {
                if (taskExecutionState == null) {
                        throw new NullPointerException("TaskExecutionState must 
not be null.");
                }
 
+               if (!this.leaderSessionID.equals(leaderSessionID)) {
+                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
+                                       + ", actual: " + leaderSessionID);
+               }
+
                if (executionGraph.updateState(taskExecutionState)) {
                        return Acknowledge.get();
                } else {
                        throw new ExecutionGraphException("The execution 
attempt " +
-                               taskExecutionState.getID() + " was not found.");
+                                       taskExecutionState.getID() + " was not 
found.");
                }
-
        }
 
-
        @RpcMethod
        public SerializedInputSplit requestNextInputSplit(
-               final JobVertexID vertexID,
-               final ExecutionAttemptID executionAttempt) throws Exception
+                       final UUID leaderSessionID,
+                       final JobVertexID vertexID,
+                       final ExecutionAttemptID executionAttempt) throws 
Exception
        {
+               if (!this.leaderSessionID.equals(leaderSessionID)) {
+                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
+                                       + ", actual: " + leaderSessionID);
+               }
+
                final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
                if (execution == null) {
                        // can happen when JobManager had already unregistered 
this execution upon on task failure,
@@ -583,7 +458,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                } catch (Exception ex) {
                        log.error("Could not serialize the next input split of 
class {}.", nextInputSplit.getClass(), ex);
                        IOException reason = new IOException("Could not 
serialize the next input split of class " +
-                               nextInputSplit.getClass() + ".", ex);
+                                       nextInputSplit.getClass() + ".", ex);
                        vertex.fail(reason);
                        throw reason;
                }
@@ -591,16 +466,21 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
        @RpcMethod
        public ExecutionState requestPartitionState(
-               final JobID ignored,
-               final IntermediateDataSetID intermediateResultId,
-               final ResultPartitionID resultPartitionId) throws 
PartitionProducerDisposedException {
+                       final UUID leaderSessionID,
+                       final IntermediateDataSetID intermediateResultId,
+                       final ResultPartitionID resultPartitionId) throws 
Exception {
+
+               if (!this.leaderSessionID.equals(leaderSessionID)) {
+                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
+                                       + ", actual: " + leaderSessionID);
+               }
 
                final Execution execution = 
executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
                if (execution != null) {
                        return execution.getState();
                }
                else {
-                       final IntermediateResult intermediateResult = 
+                       final IntermediateResult intermediateResult =
                                        
executionGraph.getAllIntermediateResults().get(intermediateResultId);
 
                        if (intermediateResult != null) {
@@ -623,7 +503,15 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        @RpcMethod
-       public Acknowledge scheduleOrUpdateConsumers(ResultPartitionID 
partitionID) throws ExecutionGraphException {
+       public Acknowledge scheduleOrUpdateConsumers(
+                       final UUID leaderSessionID,
+                       final ResultPartitionID partitionID) throws Exception
+       {
+               if (!this.leaderSessionID.equals(leaderSessionID)) {
+                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
+                                       + ", actual: " + leaderSessionID);
+               }
+
                executionGraph.scheduleOrUpdateConsumers(partitionID);
                return Acknowledge.get();
        }
@@ -638,171 +526,118 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        final JobID jobID,
                        final ExecutionAttemptID executionAttemptID,
                        final CheckpointMetaData checkpointInfo,
-                       final SubtaskState checkpointStateHandles) {
+                       final SubtaskState checkpointState) throws 
CheckpointException {
 
-               throw new UnsupportedOperationException();
+               final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+               final AcknowledgeCheckpoint ackMessage = 
+                               new AcknowledgeCheckpoint(jobID, 
executionAttemptID, checkpointInfo, checkpointState);
+
+               if (checkpointCoordinator != null) {
+                       getRpcService().execute(new Runnable() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
+                                       } catch (Throwable t) {
+                                               log.warn("Error while 
processing checkpoint acknowledgement message");
+                                       }
+                               }
+                       });
+               } else {
+                       log.error("Received AcknowledgeCheckpoint message for 
job {} with no CheckpointCoordinator",
+                                       jobGraph.getJobID());
+               }
        }
 
        @RpcMethod
        public void declineCheckpoint(
                        final JobID jobID,
                        final ExecutionAttemptID executionAttemptID,
-                       final long checkpointId,
-                       final Throwable cause) {
-
-               throw new UnsupportedOperationException();
-       }
-
-       
//----------------------------------------------------------------------------------------------
-       // Internal methods
-       
//----------------------------------------------------------------------------------------------
-
-       @RpcMethod
-       public void resourceRemoved(final ResourceID resourceId, final String 
message) {
-               // TODO: remove resource from slot pool
-       }
+                       final long checkpointID,
+                       final Throwable reason)
+       {
+               final DeclineCheckpoint decline = new DeclineCheckpoint(
+                               jobID, executionAttemptID, checkpointID, 
reason);
+               final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
 
-       @RpcMethod
-       public void acknowledgeCheckpoint(final AcknowledgeCheckpoint 
acknowledge) {
-               if (executionGraph != null) {
-                       final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
-                       if (checkpointCoordinator != null) {
-                               getRpcService().execute(new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       if 
(!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) {
-                                                               
log.info("Received message for non-existing checkpoint {}.",
-                                                                       
acknowledge.getCheckpointId());
-                                                       }
-                                               } catch (Exception e) {
-                                                       log.error("Error in 
CheckpointCoordinator while processing {}", acknowledge, e);
-                                               }
+               if (checkpointCoordinator != null) {
+                       getRpcService().execute(new Runnable() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               
checkpointCoordinator.receiveDeclineMessage(decline);
+                                       } catch (Exception e) {
+                                               log.error("Error in 
CheckpointCoordinator while processing {}", decline, e);
                                        }
-                               });
-                       }
-                       else {
-                               log.error("Received AcknowledgeCheckpoint 
message for job {} with no CheckpointCoordinator",
-                                       jobGraph.getJobID());
-                       }
+                               }
+                       });
                } else {
-                       log.error("Received AcknowledgeCheckpoint for 
unavailable job {}", jobGraph.getJobID());
-               }
-       }
-
-       @RpcMethod
-       public void declineCheckpoint(final DeclineCheckpoint decline) {
-               if (executionGraph != null) {
-                       final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
-                       if (checkpointCoordinator != null) {
-                               getRpcService().execute(new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       log.info("Received 
message for non-existing checkpoint {}.", decline.getCheckpointId());
-                                               } catch (Exception e) {
-                                                       log.error("Error in 
CheckpointCoordinator while processing {}", decline, e);
-                                               }
-                                       }
-                               });
-                       } else {
-                               log.error("Received DeclineCheckpoint message 
for job {} with no CheckpointCoordinator",
+                       log.error("Received DeclineCheckpoint message for job 
{} with no CheckpointCoordinator",
                                        jobGraph.getJobID());
-                       }
-               } else {
-                       log.error("Received AcknowledgeCheckpoint for 
unavailable job {}", jobGraph.getJobID());
                }
        }
 
        @RpcMethod
        public KvStateLocation lookupKvStateLocation(final String 
registrationName) throws Exception {
-               if (executionGraph != null) {
-                       if (log.isDebugEnabled()) {
-                               log.debug("Lookup key-value state for job {} 
with registration " +
+               if (log.isDebugEnabled()) {
+                       log.debug("Lookup key-value state for job {} with 
registration " +
                                        "name {}.", jobGraph.getJobID(), 
registrationName);
-                       }
+               }
 
-                       final KvStateLocationRegistry registry = 
executionGraph.getKvStateLocationRegistry();
-                       final KvStateLocation location = 
registry.getKvStateLocation(registrationName);
-                       if (location != null) {
-                               return location;
-                       } else {
-                               throw new 
UnknownKvStateLocation(registrationName);
-                       }
+               final KvStateLocationRegistry registry = 
executionGraph.getKvStateLocationRegistry();
+               final KvStateLocation location = 
registry.getKvStateLocation(registrationName);
+               if (location != null) {
+                       return location;
                } else {
-                       throw new IllegalStateException("Received lookup 
KvState location request for unavailable job " +
-                               jobGraph.getJobID());
+                       throw new UnknownKvStateLocation(registrationName);
                }
        }
 
        @RpcMethod
        public void notifyKvStateRegistered(
-               final JobVertexID jobVertexId,
-               final KeyGroupRange keyGroupRange,
-               final String registrationName,
-               final KvStateID kvStateId,
-               final KvStateServerAddress kvStateServerAddress)
+                       final JobVertexID jobVertexId,
+                       final KeyGroupRange keyGroupRange,
+                       final String registrationName,
+                       final KvStateID kvStateId,
+                       final KvStateServerAddress kvStateServerAddress)
        {
-               if (executionGraph != null) {
-                       if (log.isDebugEnabled()) {
-                               log.debug("Key value state registered for job 
{} under name {}.",
+               if (log.isDebugEnabled()) {
+                       log.debug("Key value state registered for job {} under 
name {}.",
                                        jobGraph.getJobID(), registrationName);
-                       }
-                       try {
-                               
executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
-                                       jobVertexId, keyGroupRange, 
registrationName, kvStateId, kvStateServerAddress
-                               );
-                       } catch (Exception e) {
-                               log.error("Failed to notify KvStateRegistry 
about registration {}.", registrationName);
-                       }
-               } else {
-                       log.error("Received notify KvState registered request 
for unavailable job " + jobGraph.getJobID());
+               }
+
+               try {
+                       
executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
+                                       jobVertexId, keyGroupRange, 
registrationName, kvStateId, kvStateServerAddress);
+               } catch (Exception e) {
+                       log.error("Failed to notify KvStateRegistry about 
registration {}.", registrationName);
                }
        }
 
        @RpcMethod
        public void notifyKvStateUnregistered(
-               JobVertexID jobVertexId,
-               KeyGroupRange keyGroupRange,
-               String registrationName)
+                       JobVertexID jobVertexId,
+                       KeyGroupRange keyGroupRange,
+                       String registrationName)
        {
-               if (executionGraph != null) {
-                       if (log.isDebugEnabled()) {
-                               log.debug("Key value state unregistered for job 
{} under name {}.",
+               if (log.isDebugEnabled()) {
+                       log.debug("Key value state unregistered for job {} 
under name {}.",
                                        jobGraph.getJobID(), registrationName);
-                       }
-                       try {
-                               
executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
-                                       jobVertexId, keyGroupRange, 
registrationName
-                               );
-                       } catch (Exception e) {
-                               log.error("Failed to notify KvStateRegistry 
about registration {}.", registrationName);
-                       }
-               } else {
-                       log.error("Received notify KvState unregistered request 
for unavailable job " + jobGraph.getJobID());
                }
-       }
 
-       @RpcMethod
-       public Future<TriggerSavepointResponse> triggerSavepoint() throws 
Exception {
-               return null;
-       }
-
-       @RpcMethod
-       public DisposeSavepointResponse disposeSavepoint(final String 
savepointPath) {
-               // TODO
-               return null;
+               try {
+                       
executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
+                                       jobVertexId, keyGroupRange, 
registrationName);
+               } catch (Exception e) {
+                       log.error("Failed to notify KvStateRegistry about 
registration {}.", registrationName);
+               }
        }
 
        @RpcMethod
        public ClassloadingProps requestClassloadingProps() throws Exception {
-               if (executionGraph != null) {
-                       return new 
ClassloadingProps(libraryCacheManager.getBlobServerPort(),
+               return new 
ClassloadingProps(libraryCacheManager.getBlobServerPort(),
                                executionGraph.getRequiredJarFiles(),
                                executionGraph.getRequiredClasspaths());
-               } else {
-                       throw new Exception("Received classloading props 
request for unavailable job " + jobGraph.getJobID());
-               }
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -815,12 +650,11 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        public void run() {
                                log.error("Fatal error occurred on JobManager, 
cause: {}", cause.getMessage(), cause);
                                shutDown();
-                               jobCompletionActions.onFatalError(cause);
+                               errorHandler.onFatalError(cause);
                        }
                });
        }
 
-       // TODO - wrap this as StatusListenerMessenger's callback with rpc main 
thread
        private void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
                final JobID jobID = executionGraph.getJobID();
                final String jobName = executionGraph.getJobName();
@@ -848,36 +682,33 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        if (newJobStatus == JobStatus.FINISHED) {
                                try {
                                        final Map<String, 
SerializedValue<Object>> accumulatorResults =
-                                               
executionGraph.getAccumulatorsSerialized();
+                                                       
executionGraph.getAccumulatorsSerialized();
                                        final SerializedJobExecutionResult 
result = new SerializedJobExecutionResult(
-                                               jobID, 0, accumulatorResults // 
TODO get correct job duration
+                                                       jobID, 0, 
accumulatorResults // TODO get correct job duration
                                        );
                                        
jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
                                } catch (Exception e) {
                                        log.error("Cannot fetch final 
accumulators for job {} ({})", jobName, jobID, e);
                                        final JobExecutionException exception = 
new JobExecutionException(
-                                               jobID, "Failed to retrieve 
accumulator results.", e);
+                                                       jobID, "Failed to 
retrieve accumulator results.", e);
                                        // TODO should we also notify client?
                                        
jobCompletionActions.jobFailed(exception);
                                }
-                       }
-                       else if (newJobStatus == JobStatus.CANCELED) {
+                       } else if (newJobStatus == JobStatus.CANCELED) {
                                final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
                                final JobExecutionException exception = new 
JobExecutionException(
-                                       jobID, "Job was cancelled.", 
unpackedError);
+                                               jobID, "Job was cancelled.", 
unpackedError);
                                // TODO should we also notify client?
                                jobCompletionActions.jobFailed(exception);
-                       }
-                       else if (newJobStatus == JobStatus.FAILED) {
+                       } else if (newJobStatus == JobStatus.FAILED) {
                                final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
                                final JobExecutionException exception = new 
JobExecutionException(
-                                       jobID, "Job execution failed.", 
unpackedError);
+                                               jobID, "Job execution failed.", 
unpackedError);
                                // TODO should we also notify client?
                                jobCompletionActions.jobFailed(exception);
-                       }
-                       else {
+                       } else {
                                final JobExecutionException exception = new 
JobExecutionException(
-                                       jobID, newJobStatus + " is not a 
terminal state.");
+                                               jobID, newJobStatus + " is not 
a terminal state.");
                                // TODO should we also notify client?
                                jobCompletionActions.jobFailed(exception);
                                throw new RuntimeException(exception);
@@ -886,7 +717,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        private void notifyOfNewResourceManagerLeader(
-               final String resourceManagerAddress, final UUID 
resourceManagerLeaderId)
+                       final String resourceManagerAddress, final UUID 
resourceManagerLeaderId)
        {
                // IMPORTANT: executed by main thread to avoid concurrence
                runAsync(new Runnable() {
@@ -895,17 +726,15 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                if (resourceManagerConnection != null) {
                                        if (resourceManagerAddress != null) {
                                                if 
(resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
-                                                       && 
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
-                                               {
+                                                               && 
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
                                                        // both address and 
leader id are not changed, we can keep the old connection
                                                        return;
                                                }
                                                log.info("ResourceManager 
leader changed from {} to {}. Registering at new leader.",
-                                                       
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
-                                       }
-                                       else {
+                                                               
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+                                       } else {
                                                log.info("Current 
ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
-                                                       
resourceManagerConnection.getTargetAddress());
+                                                               
resourceManagerConnection.getTargetAddress());
                                        }
                                }
 
@@ -914,8 +743,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                if (resourceManagerAddress != null) {
                                        log.info("Attempting to register at 
ResourceManager {}", resourceManagerAddress);
                                        resourceManagerConnection = new 
ResourceManagerConnection(
-                                               log, jobGraph.getJobID(), 
leaderSessionID,
-                                               resourceManagerAddress, 
resourceManagerLeaderId, executionContext);
+                                                       log, 
jobGraph.getJobID(), leaderSessionID,
+                                                       resourceManagerAddress, 
resourceManagerLeaderId, executionContext);
                                        resourceManagerConnection.start();
                                }
                        }
@@ -929,26 +758,14 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                // TODO - add tests for comment in 
https://github.com/apache/flink/pull/2565
                                // verify the response with current connection
                                if (resourceManagerConnection != null
-                                       && 
resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
 {
+                                               && 
resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
 {
                                        log.info("JobManager successfully 
registered at ResourceManager, leader id: {}.",
-                                               
success.getResourceManagerLeaderId());
+                                                       
success.getResourceManagerLeaderId());
                                }
                        }
                });
        }
 
-       private void disposeCommunicationWithResourceManager() {
-               // 1. stop the leader retriever so we will not receiving 
updates anymore
-               try {
-                       resourceManagerLeaderRetriever.stop();
-               } catch (Exception e) {
-                       log.warn("Failed to stop resource manager leader 
retriever.");
-               }
-
-               // 2. close current connection with ResourceManager if exists
-               closeResourceManagerConnection();
-       }
-
        private void closeResourceManagerConnection() {
                if (resourceManagerConnection != null) {
                        resourceManagerConnection.close();
@@ -957,34 +774,6 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        
//----------------------------------------------------------------------------------------------
-       // Helper methods
-       
//----------------------------------------------------------------------------------------------
-
-       /**
-        * Converts JobVertexIDs to corresponding ExecutionJobVertexes
-        *
-        * @param executionGraph The execution graph that holds the relationship
-        * @param vertexIDs      The vertexIDs need to be converted
-        * @return The corresponding ExecutionJobVertexes
-        * @throws JobExecutionException
-        */
-       private static List<ExecutionJobVertex> getExecutionJobVertexWithId(
-               final ExecutionGraph executionGraph, final List<JobVertexID> 
vertexIDs)
-               throws JobExecutionException
-       {
-               final List<ExecutionJobVertex> ret = new 
ArrayList<>(vertexIDs.size());
-               for (JobVertexID vertexID : vertexIDs) {
-                       final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(vertexID);
-                       if (executionJobVertex == null) {
-                               throw new 
JobExecutionException(executionGraph.getJobID(),
-                                       "The snapshot checkpointing settings 
refer to non-existent vertex " + vertexID);
-                       }
-                       ret.add(executionJobVertex);
-               }
-               return ret;
-       }
-
-       
//----------------------------------------------------------------------------------------------
        // Utility classes
        
//----------------------------------------------------------------------------------------------
 
@@ -1001,19 +790,19 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        private class ResourceManagerConnection
-               extends RegisteredRpcConnection<ResourceManagerGateway, 
JobMasterRegistrationSuccess>
+                       extends RegisteredRpcConnection<ResourceManagerGateway, 
JobMasterRegistrationSuccess>
        {
                private final JobID jobID;
 
                private final UUID jobManagerLeaderID;
 
                ResourceManagerConnection(
-                       final Logger log,
-                       final JobID jobID,
-                       final UUID jobManagerLeaderID,
-                       final String resourceManagerAddress,
-                       final UUID resourceManagerLeaderID,
-                       final Executor executor)
+                               final Logger log,
+                               final JobID jobID,
+                               final UUID jobManagerLeaderID,
+                               final String resourceManagerAddress,
+                               final UUID resourceManagerLeaderID,
+                               final Executor executor)
                {
                        super(log, resourceManagerAddress, 
resourceManagerLeaderID, executor);
                        this.jobID = checkNotNull(jobID);
@@ -1023,12 +812,12 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                @Override
                protected RetryingRegistration<ResourceManagerGateway, 
JobMasterRegistrationSuccess> generateRegistration() {
                        return new RetryingRegistration<ResourceManagerGateway, 
JobMasterRegistrationSuccess>(
-                               log, getRpcService(), "ResourceManager", 
ResourceManagerGateway.class,
-                               getTargetAddress(), getTargetLeaderId())
+                                       log, getRpcService(), 
"ResourceManager", ResourceManagerGateway.class,
+                                       getTargetAddress(), getTargetLeaderId())
                        {
                                @Override
                                protected Future<RegistrationResponse> 
invokeRegistration(ResourceManagerGateway gateway, UUID leaderId,
-                                       long timeoutMillis) throws Exception
+                                               long timeoutMillis) throws 
Exception
                                {
                                        Time timeout = 
Time.milliseconds(timeoutMillis);
                                        return 
gateway.registerJobMaster(leaderId, jobManagerLeaderID, getAddress(), jobID, 
timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 5223b3e..daa33a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -32,11 +31,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
-import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
-import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateServerAddress;
@@ -52,52 +47,54 @@ import java.util.UUID;
  */
 public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 
-       /**
-        * Starting the job under the given leader session ID.
-        */
-       void startJob(final UUID leaderSessionID);
+       // 
------------------------------------------------------------------------
+       //  Job start and stop methods
+       // 
------------------------------------------------------------------------
 
-       /**
-        * Suspending job, all the running tasks will be cancelled, and runtime 
status will be cleared.
-        * Should re-submit the job before restarting it.
-        *
-        * @param cause The reason of why this job been suspended.
-        */
-       void suspendJob(final Throwable cause);
+       void startJobExecution();
+
+       void suspendExecution(Throwable cause);
+
+       // 
------------------------------------------------------------------------
 
        /**
         * Updates the task execution state for a given task.
         *
+        * @param leaderSessionID    The leader id of JobManager
         * @param taskExecutionState New task execution state for a given task
         * @return Future flag of the task execution state update result
         */
-       Future<Acknowledge> updateTaskExecutionState(TaskExecutionState 
taskExecutionState);
+       Future<Acknowledge> updateTaskExecutionState(
+                       final UUID leaderSessionID,
+                       final TaskExecutionState taskExecutionState);
 
        /**
         * Requesting next input split for the {@link ExecutionJobVertex}. The 
next input split is sent back to the sender
         * as a {@link SerializedInputSplit} message.
         *
+        * @param leaderSessionID  The leader id of JobManager
         * @param vertexID         The job vertex id
         * @param executionAttempt The execution attempt id
         * @return The future of the input split. If there is no further input 
split, will return an empty object.
         */
        Future<SerializedInputSplit> requestNextInputSplit(
-               final JobVertexID vertexID,
-               final ExecutionAttemptID executionAttempt);
+                       final UUID leaderSessionID,
+                       final JobVertexID vertexID,
+                       final ExecutionAttemptID executionAttempt);
 
        /**
-        * Requests the current state of the producer of an intermediate result 
partition.
+        * Requests the current state of the partition.
         * The state of a partition is currently bound to the state of the 
producing execution.
         *
-        * @param jobId                TheID of job that the intermediate 
result partition belongs to.
+        * @param leaderSessionID The leader id of JobManager
         * @param intermediateResultId The execution attempt ID of the task 
requesting the partition state.
         * @param partitionId          The partition ID of the partition to 
request the state of.
         * @return The future of the partition state
         */
        Future<ExecutionState> requestPartitionState(
-                       JobID jobId,
-                       IntermediateDataSetID intermediateResultId,
-                       ResultPartitionID partitionId);
+                       final UUID leaderSessionID,
+                       final IntermediateDataSetID intermediateResultId,
+                       final ResultPartitionID partitionId);
 
        /**
         * Notifies the JobManager about available data for a produced 
partition.
@@ -108,11 +105,15 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
         * <p>
         * The JobManager then can decide when to schedule the partition 
consumers of the given session.
         *
-        * @param partitionID The partition which has already produced data
-        * @param timeout before the rpc call fails
+        * @param leaderSessionID The leader id of JobManager
+        * @param partitionID     The partition which has already produced data
+        * @param timeout         before the rpc call fails
         * @return Future acknowledge of the schedule or update operation
         */
-       Future<Acknowledge> scheduleOrUpdateConsumers(final ResultPartitionID 
partitionID, @RpcTimeout final Time timeout);
+       Future<Acknowledge> scheduleOrUpdateConsumers(
+                       final UUID leaderSessionID,
+                       final ResultPartitionID partitionID,
+                       @RpcTimeout final Time timeout);
 
        /**
         * Disconnects the given {@link 
org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
@@ -123,36 +124,12 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
        void disconnectTaskManager(ResourceID resourceID);
 
        /**
-        * Notifies the JobManager about the removal of a resource.
-        *
-        * @param resourceId The ID under which the resource is registered.
-        * @param message    Optional message with details, for logging and 
debugging.
-        */
-
-       void resourceRemoved(final ResourceID resourceId, final String message);
-
-       /**
-        * Notifies the JobManager that the checkpoint of an individual task is 
completed.
-        *
-        * @param acknowledge The acknowledge message of the checkpoint
-        */
-       void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge);
-
-       /**
-        * Notifies the JobManager that a checkpoint request could not be 
heeded.
-        * This can happen if a Task is already in RUNNING state but is 
internally not yet ready to perform checkpoints.
-        *
-        * @param decline The decline message of the checkpoint
-        */
-       void declineCheckpoint(final DeclineCheckpoint decline);
-
-       /**
         * Requests a {@link KvStateLocation} for the specified {@link KvState} 
registration name.
         *
         * @param registrationName Name under which the KvState has been 
registered.
         * @return Future of the requested {@link KvState} location
         */
-       Future<KvStateLocation> lookupKvStateLocation(final String 
registrationName) throws Exception;
+       Future<KvStateLocation> lookupKvStateLocation(final String 
registrationName);
 
        /**
         * @param jobVertexId          JobVertexID the KvState instance belongs 
to.
@@ -162,11 +139,11 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
         * @param kvStateServerAddress Server address where to find the KvState 
instance.
         */
        void notifyKvStateRegistered(
-               final JobVertexID jobVertexId,
-               final KeyGroupRange keyGroupRange,
-               final String registrationName,
-               final KvStateID kvStateId,
-               final KvStateServerAddress kvStateServerAddress);
+                       final JobVertexID jobVertexId,
+                       final KeyGroupRange keyGroupRange,
+                       final String registrationName,
+                       final KvStateID kvStateId,
+                       final KvStateServerAddress kvStateServerAddress);
 
        /**
         * @param jobVertexId      JobVertexID the KvState instance belongs to.
@@ -174,24 +151,9 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
         * @param registrationName Name under which the KvState has been 
registered.
         */
        void notifyKvStateUnregistered(
-               JobVertexID jobVertexId,
-               KeyGroupRange keyGroupRange,
-               String registrationName);
-
-       /**
-        * Notifies the JobManager to trigger a savepoint for this job.
-        *
-        * @return Future of the savepoint trigger response.
-        */
-       Future<TriggerSavepointResponse> triggerSavepoint();
-
-       /**
-        * Notifies the Jobmanager to dispose specified savepoint.
-        *
-        * @param savepointPath The path of the savepoint.
-        * @return The future of the savepoint disponse response.
-        */
-       Future<DisposeSavepointResponse> disposeSavepoint(final String 
savepointPath);
+                       JobVertexID jobVertexId,
+                       KeyGroupRange keyGroupRange,
+                       String registrationName);
 
        /**
         * Request the classloading props of this job.

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
index e8fb5bb..019ccfe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import org.slf4j.Logger;
@@ -62,6 +64,9 @@ public class MiniClusterJobDispatcher {
        /** al the services that the JobManager needs, such as BLOB service, 
factories, etc */
        private final JobManagerServices jobManagerServices;
 
+       /** Registry for all metrics in the mini cluster */
+       private final MetricRegistry metricRegistry;
+
        /** The number of JobManagers to launch (more than one simulates a 
high-availability setup) */
        private final int numJobManagers;
 
@@ -86,8 +91,9 @@ public class MiniClusterJobDispatcher {
        public MiniClusterJobDispatcher(
                        Configuration config,
                        RpcService rpcService,
-                       HighAvailabilityServices haServices) throws Exception {
-               this(config, rpcService, haServices, 1);
+                       HighAvailabilityServices haServices,
+                       MetricRegistry metricRegistry) throws Exception {
+               this(config, rpcService, haServices, metricRegistry, 1);
        }
 
        /**
@@ -106,16 +112,18 @@ public class MiniClusterJobDispatcher {
                        Configuration config,
                        RpcService rpcService,
                        HighAvailabilityServices haServices,
+                       MetricRegistry metricRegistry,
                        int numJobManagers) throws Exception {
 
                checkArgument(numJobManagers >= 1);
                this.configuration = checkNotNull(config);
                this.rpcService = checkNotNull(rpcService);
                this.haServices = checkNotNull(haServices);
+               this.metricRegistry = checkNotNull(metricRegistry);
                this.numJobManagers = numJobManagers;
 
                LOG.info("Creating JobMaster services");
-               this.jobManagerServices = 
JobManagerServices.fromConfiguration(config);
+               this.jobManagerServices = 
JobManagerServices.fromConfiguration(config, haServices);
        }
 
        // 
------------------------------------------------------------------------
@@ -140,9 +148,8 @@ public class MiniClusterJobDispatcher {
                                if (runners != null) {
                                        this.runners = null;
 
-                                       Exception shutdownException = new 
Exception("The MiniCluster is shutting down");
                                        for (JobManagerRunner runner : runners) 
{
-                                               
runner.shutdown(shutdownException);
+                                               runner.shutdown();
                                        }
                                }
                        }
@@ -171,9 +178,9 @@ public class MiniClusterJobDispatcher {
                        checkState(!shutdown, "mini cluster is shut down");
                        checkState(runners == null, "mini cluster can only 
execute one job at a time");
 
-                       OnCompletionActions onJobCompletion = new 
DetachedFinalizer(numJobManagers);
+                       DetachedFinalizer finalizer = new 
DetachedFinalizer(numJobManagers);
 
-                       this.runners = startJobRunners(job, onJobCompletion);
+                       this.runners = startJobRunners(job, finalizer, 
finalizer);
                }
        }
 
@@ -191,17 +198,17 @@ public class MiniClusterJobDispatcher {
                checkNotNull(job);
                
                LOG.info("Received job for blocking execution {} ({})", 
job.getName(), job.getJobID());
-               final BlockingJobSync onJobCompletion = new 
BlockingJobSync(job.getJobID(), numJobManagers);
+               final BlockingJobSync sync = new 
BlockingJobSync(job.getJobID(), numJobManagers);
 
                synchronized (lock) {
                        checkState(!shutdown, "mini cluster is shut down");
                        checkState(runners == null, "mini cluster can only 
execute one job at a time");
 
-                       this.runners = startJobRunners(job, onJobCompletion);
+                       this.runners = startJobRunners(job, sync, sync);
                }
 
                try {
-                       return onJobCompletion.getResult();
+                       return sync.getResult();
                }
                finally {
                        // always clear the status for the next job
@@ -209,24 +216,26 @@ public class MiniClusterJobDispatcher {
                }
        }
 
-       private JobManagerRunner[] startJobRunners(JobGraph job, 
OnCompletionActions onCompletion) throws JobExecutionException {
+       private JobManagerRunner[] startJobRunners(
+                       JobGraph job,
+                       OnCompletionActions onCompletion,
+                       FatalErrorHandler errorHandler) throws 
JobExecutionException {
                LOG.info("Starting {} JobMaster(s) for job {} ({})", 
numJobManagers, job.getName(), job.getJobID());
 
                JobManagerRunner[] runners = new 
JobManagerRunner[numJobManagers];
                for (int i = 0; i < numJobManagers; i++) {
                        try {
                                runners[i] = new JobManagerRunner(job, 
configuration,
-                                               rpcService, haServices, 
jobManagerServices, onCompletion);
+                                               rpcService, haServices, 
jobManagerServices, metricRegistry, 
+                                               onCompletion, errorHandler);
                                runners[i].start();
                        }
                        catch (Throwable t) {
                                // shut down all the ones so far
-                               Exception shutdownCause = new Exception("Failed 
to properly start all mini cluster JobManagers", t);
-
                                for (int k = 0; k <= i; k++) {
                                        try {
                                                if (runners[i] != null) {
-                                                       
runners[i].shutdown(shutdownCause);
+                                                       runners[i].shutdown();
                                                }
                                        } catch (Throwable ignored) {
                                                // silent shutdown
@@ -244,15 +253,15 @@ public class MiniClusterJobDispatcher {
        //  test methods to simulate job master failures
        // 
------------------------------------------------------------------------
 
-       public void killJobMaster(int which) {
-               checkArgument(which >= 0 && which < numJobManagers, "no such 
job master");
-               checkState(!shutdown, "mini cluster is shut down");
-
-               JobManagerRunner[] runners = this.runners;
-               checkState(runners != null, "mini cluster it not executing a 
job right now");
-
-               runners[which].shutdown(new Throwable("kill JobManager"));
-       }
+//     public void killJobMaster(int which) {
+//             checkArgument(which >= 0 && which < numJobManagers, "no such 
job master");
+//             checkState(!shutdown, "mini cluster is shut down");
+//
+//             JobManagerRunner[] runners = this.runners;
+//             checkState(runners != null, "mini cluster it not executing a 
job right now");
+//
+//             runners[which].shutdown(new Throwable("kill JobManager"));
+//     }
 
        // 
------------------------------------------------------------------------
        //  utility classes
@@ -263,7 +272,7 @@ public class MiniClusterJobDispatcher {
         * In the case of a high-availability test setup, there may be multiple 
runners.
         * After that, it marks the mini cluster as ready to receive new jobs.
         */
-       private class DetachedFinalizer implements OnCompletionActions {
+       private class DetachedFinalizer implements OnCompletionActions, 
FatalErrorHandler {
 
                private final AtomicInteger numJobManagersToWaitFor;
 
@@ -308,7 +317,7 @@ public class MiniClusterJobDispatcher {
         * That way it is guaranteed that after the blocking job submit call 
returns,
         * the dispatcher is immediately free to accept another job.
         */
-       private static class BlockingJobSync implements OnCompletionActions {
+       private static class BlockingJobSync implements OnCompletionActions, 
FatalErrorHandler {
 
                private final JobID jobId;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
index 520755d..572ba2f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.blob.BlobKey;
 import java.io.Serializable;
 import java.net.URL;
 import java.util.Collection;
-import java.util.List;
 
 /**
  * The response of classloading props request to JobManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
deleted file mode 100644
index 42bfc71..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.message;
-
-import java.io.Serializable;
-
-/**
- * The response of the dispose savepoint request to JobManager.
- */
-public abstract class DisposeSavepointResponse implements Serializable {
-
-       private static final long serialVersionUID = 6008792963949369567L;
-
-       public static class Success extends DisposeSavepointResponse implements 
Serializable {
-
-               private static final long serialVersionUID = 
1572462960008711415L;
-       }
-
-       public static class Failure extends DisposeSavepointResponse implements 
Serializable {
-
-               private static final long serialVersionUID = 
-7505308325483022458L;
-
-               private final Throwable cause;
-
-               public Failure(final Throwable cause) {
-                       this.cause = cause;
-               }
-
-               public Throwable getCause() {
-                       return cause;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
deleted file mode 100644
index 0b0edc5..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.message;
-
-import org.apache.flink.api.common.JobID;
-
-import java.io.Serializable;
-
-/**
- * The response of the trigger savepoint request to JobManager.
- */
-public abstract class TriggerSavepointResponse implements Serializable {
-
-       private static final long serialVersionUID = 3139327824611807707L;
-
-       private final JobID jobID;
-
-       public JobID getJobID() {
-               return jobID;
-       }
-
-       public TriggerSavepointResponse(final JobID jobID) {
-               this.jobID = jobID;
-       }
-
-       public static class Success extends TriggerSavepointResponse implements 
Serializable {
-
-               private static final long serialVersionUID = 
-1100637460388881776L;
-
-               private final String savepointPath;
-
-               public Success(final JobID jobID, final String savepointPath) {
-                       super(jobID);
-                       this.savepointPath = savepointPath;
-               }
-
-               public String getSavepointPath() {
-                       return savepointPath;
-               }
-       }
-
-       public static class Failure extends TriggerSavepointResponse implements 
Serializable {
-
-               private static final long serialVersionUID = 
-1668479003490615139L;
-
-               private final Throwable cause;
-
-               public Failure(final JobID jobID, final Throwable cause) {
-                       super(jobID);
-                       this.cause = cause;
-               }
-
-               public Throwable getCause() {
-                       return cause;
-               }
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 2052f98..4b9100a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -33,8 +33,8 @@ import java.util.concurrent.TimeUnit;
 public interface RpcService {
 
        /**
-        * Return the address under which the rpc service can be reached. If 
the rpc service cannot be
-        * contacted remotely, then it will return an empty string.
+        * Return the hostname or host address under which the rpc service can 
be reached.
+        * If the rpc service cannot be contacted remotely, then it will return 
an empty string.
         *
         * @return Address of the rpc service or empty string if local rpc 
service
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
index 72668d2..1b311e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -26,11 +26,16 @@ import 
org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.util.Preconditions;
 
+import java.util.UUID;
+
 /**
  * Container class for JobManager specific communication utils used by the 
{@link TaskExecutor}.
  */
 public class JobManagerConnection {
 
+       // Job master leader session id
+       private final UUID jobMasterLeaderId;
+
        // Gateway to the job master
        private final JobMasterGateway jobMasterGateway;
 
@@ -50,13 +55,15 @@ public class JobManagerConnection {
        private final PartitionProducerStateChecker partitionStateChecker;
 
        public JobManagerConnection(
-               JobMasterGateway jobMasterGateway,
-               TaskManagerActions taskManagerActions,
-               CheckpointResponder checkpointResponder,
-               LibraryCacheManager libraryCacheManager,
-               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier,
-               PartitionProducerStateChecker partitionStateChecker) {
-
+                       UUID jobMasterLeaderId,
+                       JobMasterGateway jobMasterGateway,
+                       TaskManagerActions taskManagerActions,
+                       CheckpointResponder checkpointResponder,
+                       LibraryCacheManager libraryCacheManager,
+                       ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier,
+                       PartitionProducerStateChecker partitionStateChecker)
+       {
+               this.jobMasterLeaderId = 
Preconditions.checkNotNull(jobMasterLeaderId);
                this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
                this.taskManagerActions = 
Preconditions.checkNotNull(taskManagerActions);
                this.checkpointResponder = 
Preconditions.checkNotNull(checkpointResponder);
@@ -65,6 +72,10 @@ public class JobManagerConnection {
                this.partitionStateChecker = 
Preconditions.checkNotNull(partitionStateChecker);
        }
 
+       public UUID getJobMasterLeaderId() {
+               return jobMasterLeaderId;
+       }
+
        public JobMasterGateway getJobManagerGateway() {
                return jobMasterGateway;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 36f108e..2389291 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -18,37 +18,46 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
 import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
@@ -62,26 +71,16 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.RpcService;
-
 import org.apache.flink.util.Preconditions;
 
-import java.util.HashSet;
-import java.util.Set;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -292,6 +291,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                tdd.getAttemptNumber());
 
                InputSplitProvider inputSplitProvider = new 
RpcInputSplitProvider(
+                               jobManagerConnection.getJobMasterLeaderId(),
                                jobManagerConnection.getJobManagerGateway(),
                                jobInformation.getJobId(),
                                taskInformation.getJobVertexId(),
@@ -605,10 +605,15 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                clearTasks();
        }
 
-       private void updateTaskExecutionState(final JobMasterGateway 
jobMasterGateway, final TaskExecutionState taskExecutionState) {
+       private void updateTaskExecutionState(
+                       final UUID jobMasterLeaderId,
+                       final JobMasterGateway jobMasterGateway,
+                       final TaskExecutionState taskExecutionState)
+       {
                final ExecutionAttemptID executionAttemptID = 
taskExecutionState.getID();
 
-               Future<Acknowledge> futureAcknowledge = 
jobMasterGateway.updateTaskExecutionState(taskExecutionState);
+               Future<Acknowledge> futureAcknowledge = 
jobMasterGateway.updateTaskExecutionState(
+                               jobMasterLeaderId, taskExecutionState);
 
                futureAcknowledge.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
                        @Override
@@ -620,7 +625,11 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }, getMainThreadExecutor());
        }
 
-       private void unregisterTaskAndNotifyFinalState(final JobMasterGateway 
jobMasterGateway, ExecutionAttemptID executionAttemptID) {
+       private void unregisterTaskAndNotifyFinalState(
+                       final UUID jobMasterLeaderId,
+                       final JobMasterGateway jobMasterGateway,
+                       final ExecutionAttemptID executionAttemptID)
+       {
                Task task = removeTask(executionAttemptID);
 
                if (task != null) {
@@ -638,14 +647,15 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        AccumulatorSnapshot accumulatorSnapshot = 
task.getAccumulatorRegistry().getSnapshot();
 
                        updateTaskExecutionState(
-                               jobMasterGateway,
-                               new TaskExecutionState(
-                                       task.getJobID(),
-                                       task.getExecutionId(),
-                                       task.getExecutionState(),
-                                       task.getFailureCause(),
-                                       accumulatorSnapshot,
-                                       
task.getMetricGroup().getIOMetricGroup().createSnapshot()));
+                                       jobMasterLeaderId,
+                                       jobMasterGateway,
+                                       new TaskExecutionState(
+                                                       task.getJobID(),
+                                                       task.getExecutionId(),
+                                                       
task.getExecutionState(),
+                                                       task.getFailureCause(),
+                                                       accumulatorSnapshot,
+                                                       
task.getMetricGroup().getIOMetricGroup().createSnapshot()));
                } else {
                        log.error("Cannot find task with ID {} to unregister.", 
executionAttemptID);
                }
@@ -687,11 +697,14 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
-       private JobManagerConnection associateWithJobManager(JobMasterGateway 
jobMasterGateway, int blobPort) {
+       private JobManagerConnection associateWithJobManager(UUID 
jobMasterLeaderId,
+                       JobMasterGateway jobMasterGateway, int blobPort)
+       {
+               Preconditions.checkNotNull(jobMasterLeaderId);
                Preconditions.checkNotNull(jobMasterGateway);
                Preconditions.checkArgument(blobPort > 0 || blobPort <= 65535, 
"Blob port is out of range.");
 
-               TaskManagerActions taskManagerActions = new 
TaskManagerActionsImpl(jobMasterGateway);
+               TaskManagerActions taskManagerActions = new 
TaskManagerActionsImpl(jobMasterLeaderId, jobMasterGateway);
 
                CheckpointResponder checkpointResponder = new 
RpcCheckpointResponder(jobMasterGateway);
 
@@ -704,19 +717,21 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        taskManagerConfiguration.getCleanupInterval());
 
                ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
-                       jobMasterGateway,
-                       getRpcService().getExecutor(),
-                       taskManagerConfiguration.getTimeout());
+                               jobMasterLeaderId,
+                               jobMasterGateway,
+                               getRpcService().getExecutor(),
+                               taskManagerConfiguration.getTimeout());
 
-               PartitionProducerStateChecker partitionStateChecker = new 
RpcPartitionStateChecker(jobMasterGateway);
+               PartitionProducerStateChecker partitionStateChecker = new 
RpcPartitionStateChecker(jobMasterLeaderId, jobMasterGateway);
 
                return new JobManagerConnection(
-                       jobMasterGateway,
-                       taskManagerActions,
-                       checkpointResponder,
-                       libraryCacheManager,
-                       resultPartitionConsumableNotifier,
-                       partitionStateChecker);
+                               jobMasterLeaderId,
+                               jobMasterGateway,
+                               taskManagerActions,
+                               checkpointResponder,
+                               libraryCacheManager,
+                               resultPartitionConsumableNotifier,
+                               partitionStateChecker);
        }
 
        private void disassociateFromJobManager(JobManagerConnection 
jobManagerConnection) throws IOException {
@@ -808,9 +823,11 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        }
 
        private class TaskManagerActionsImpl implements TaskManagerActions {
+               private final UUID jobMasterLeaderId;
                private final JobMasterGateway jobMasterGateway;
 
-               private TaskManagerActionsImpl(JobMasterGateway 
jobMasterGateway) {
+               private TaskManagerActionsImpl(UUID jobMasterLeaderId, 
JobMasterGateway jobMasterGateway) {
+                       this.jobMasterLeaderId = 
Preconditions.checkNotNull(jobMasterLeaderId);
                        this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
                }
 
@@ -819,7 +836,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        runAsync(new Runnable() {
                                @Override
                                public void run() {
-                                       
unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID);
+                                       
unregisterTaskAndNotifyFinalState(jobMasterLeaderId, jobMasterGateway, 
executionAttemptID);
                                }
                        });
                }
@@ -842,7 +859,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
                @Override
                public void updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
-                       
TaskExecutor.this.updateTaskExecutionState(jobMasterGateway, 
taskExecutionState);
+                       
TaskExecutor.this.updateTaskExecutionState(jobMasterLeaderId, jobMasterGateway, 
taskExecutionState);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
index 4850d63..3b9da48 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
@@ -31,7 +31,10 @@ import 
org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
+import java.util.UUID;
+
 public class RpcInputSplitProvider implements InputSplitProvider {
+       private final UUID jobMasterLeaderId;
        private final JobMasterGateway jobMasterGateway;
        private final JobID jobID;
        private final JobVertexID jobVertexID;
@@ -39,11 +42,13 @@ public class RpcInputSplitProvider implements 
InputSplitProvider {
        private final Time timeout;
 
        public RpcInputSplitProvider(
+                       UUID jobMasterLeaderId,
                        JobMasterGateway jobMasterGateway,
                        JobID jobID,
                        JobVertexID jobVertexID,
                        ExecutionAttemptID executionAttemptID,
                        Time timeout) {
+               this.jobMasterLeaderId = 
Preconditions.checkNotNull(jobMasterLeaderId);
                this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
                this.jobID = Preconditions.checkNotNull(jobID);
                this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
@@ -56,7 +61,8 @@ public class RpcInputSplitProvider implements 
InputSplitProvider {
        public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) 
throws InputSplitProviderException {
                Preconditions.checkNotNull(userCodeClassLoader);
 
-               Future<SerializedInputSplit> futureInputSplit = 
jobMasterGateway.requestNextInputSplit(jobVertexID, executionAttemptID);
+               Future<SerializedInputSplit> futureInputSplit = 
jobMasterGateway.requestNextInputSplit(
+                               jobMasterLeaderId, jobVertexID, 
executionAttemptID);
 
                try {
                        SerializedInputSplit serializedInputSplit = 
futureInputSplit.get(timeout.getSize(), timeout.getUnit());

Reply via email to