[FLINK-4986] Improvements to the JobMaster

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8730e200
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8730e200
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8730e200

Branch: refs/heads/master
Commit: 8730e200864427dd5d6ddb9f841978d68ab452bd
Parents: 91f1d09
Author: Stephan Ewen <[email protected]>
Authored: Mon Oct 17 20:26:58 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Dec 23 20:54:25 2016 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  15 +
 .../flink/runtime/jobmaster/JobMaster.java      | 336 ++++++++++---------
 2 files changed, 184 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8730e200/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cbb4c7e..2025fc2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -621,6 +621,21 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        }
 
        /**
+        * Gets the accumulator results.
+        */
+       public Map<String, Object> getAccumulators() throws IOException {
+
+               Map<String, Accumulator<?, ?>> accumulatorMap = 
aggregateUserAccumulators();
+
+               Map<String, Object> result = new HashMap<>();
+               for (Map.Entry<String, Accumulator<?, ?>> entry : 
accumulatorMap.entrySet()) {
+                       result.put(entry.getKey(), 
entry.getValue().getLocalValue());
+               }
+
+               return result;
+       }
+
+       /**
         * Gets a serialized accumulator map.
         * @return The accumulator map with serialized accumulator values.
         * @throws IOException

http://git-wip-us.apache.org/repos/asf/flink/blob/8730e200/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 458bf0c..0b3b68e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
@@ -33,7 +34,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.BiFunction;
@@ -51,6 +51,7 @@ import 
org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.instance.SlotDescriptor;
 import org.apache.flink.runtime.instance.SlotPool;
@@ -90,7 +91,6 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
@@ -246,6 +246,9 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                -1,
                                log);
 
+               // register self as job status change listener
+               executionGraph.registerJobStatusListener(new 
JobManagerJobStatusListener());
+
                this.slotPool = new SlotPool(executorService);
                this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
 
@@ -269,13 +272,18 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         */
        public void start(final UUID leaderSessionID) throws Exception {
                if (LEADER_ID_UPDATER.compareAndSet(this, null, 
leaderSessionID)) {
-                       super.start();
 
+                       // make sure the slot pool now accepts messages for 
this leader  
                        slotPool.setJobManagerLeaderId(leaderSessionID);
-                       log.info("Starting JobManager for job {} ({})", 
jobGraph.getName(), jobGraph.getJobID());
+
+                       // make sure we receive RPC and async calls
+                       super.start();
+
+                       log.info("JobManager started as leader {} for job {}", 
leaderSessionID, jobGraph.getJobID());
                        getSelf().startJobExecution();
-               } else {
-                       log.warn("Job already started with leaderId {}, 
ignoring this start request.", leaderSessionID);
+               }
+               else {
+                       log.warn("Job already started with leader ID {}, 
ignoring this start request.", leaderSessionID);
                }
        }
 
@@ -297,48 +305,21 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
        @RpcMethod
        public void startJobExecution() {
-               log.info("Starting execution of job {} ({}) with leaderId {}.",
-                               jobGraph.getName(), jobGraph.getJobID(), 
leaderSessionID);
+               log.info("Starting execution of job {} ({})", 
jobGraph.getName(), jobGraph.getJobID());
 
                try {
-                       // register self as job status change listener
-                       executionGraph.registerJobStatusListener(new 
JobStatusListener() {
-                               @Override
-                               public void jobStatusChanges(
-                                               final JobID jobId, final 
JobStatus newJobStatus, final long timestamp, final Throwable error)
-                               {
-                                       // run in rpc thread to avoid 
concurrency
-                                       runAsync(new Runnable() {
-                                               @Override
-                                               public void run() {
-                                                       
jobStatusChanged(newJobStatus, timestamp, error);
-                                               }
-                                       });
-                               }
-                       });
-
                        // job is ready to go, try to establish connection with 
resource manager
+                       //   - activate leader retrieval for the resource 
manager
+                       //   - on notification of the leader, the connection 
will be established and
+                       //     the slot pool will start requesting slots
                        resourceManagerLeaderRetriever.start(new 
ResourceManagerLeaderListener());
-               } catch (Throwable t) {
-
-                       // TODO - this should not result in a job failure, but 
another leader should take over
-                       // TODO - either this master should retry the 
execution, or it should relinquish leadership / terminate
-
+               }
+               catch (Throwable t) {
                        log.error("Failed to start job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), t);
 
-                       executionGraph.fail(t);
-
-                       final JobExecutionException rt;
-                       if (t instanceof JobExecutionException) {
-                               rt = (JobExecutionException) t;
-                       } else {
-                               rt = new 
JobExecutionException(jobGraph.getJobID(),
-                                               "Failed to start job " + 
jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
-                       }
-
-                       // TODO: notify client about this failure
+                       handleFatalError(new Exception(
+                                       "Could not start job execution: Failed 
to start leader service for Resource Manager", t));
 
-                       jobCompletionActions.jobFailed(rt);
                        return;
                }
 
@@ -348,7 +329,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        public void run() {
                                try {
                                        executionGraph.scheduleForExecution(new 
PooledSlotProvider(slotPool, allocationTimeout));
-                               } catch (Throwable t) {
+                               }
+                               catch (Throwable t) {
                                        executionGraph.fail(t);
                                }
                        }
@@ -386,6 +368,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
                closeResourceManagerConnection();
 
+               // TODO: in the future, the slot pool should not release the 
resources, so that
+               // TODO: the TaskManagers offer the resources to the new leader 
                for (ResourceID taskManagerId : 
registeredTaskManagers.keySet()) {
                        slotPool.releaseResource(taskManagerId);
                }
@@ -405,14 +389,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        final UUID leaderSessionID,
                        final TaskExecutionState taskExecutionState) throws 
Exception
        {
-               if (taskExecutionState == null) {
-                       throw new NullPointerException("TaskExecutionState must 
not be null.");
-               }
-
-               if (!this.leaderSessionID.equals(leaderSessionID)) {
-                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
-                                       + ", actual: " + leaderSessionID);
-               }
+               checkNotNull(taskExecutionState, "taskExecutionState");
+               validateLeaderSessionId(leaderSessionID);
 
                if (executionGraph.updateState(taskExecutionState)) {
                        return Acknowledge.get();
@@ -428,10 +406,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        final JobVertexID vertexID,
                        final ExecutionAttemptID executionAttempt) throws 
Exception
        {
-               if (!this.leaderSessionID.equals(leaderSessionID)) {
-                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
-                                       + ", actual: " + leaderSessionID);
-               }
+               validateLeaderSessionId(leaderSessionID);
 
                final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
                if (execution == null) {
@@ -477,16 +452,12 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
        }
 
-       @RpcMethod
        public ExecutionState requestPartitionState(
                        final UUID leaderSessionID,
                        final IntermediateDataSetID intermediateResultId,
                        final ResultPartitionID resultPartitionId) throws 
Exception {
 
-               if (!this.leaderSessionID.equals(leaderSessionID)) {
-                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
-                                       + ", actual: " + leaderSessionID);
-               }
+               validateLeaderSessionId(leaderSessionID);
 
                final Execution execution = 
executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
                if (execution != null) {
@@ -520,10 +491,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        final UUID leaderSessionID,
                        final ResultPartitionID partitionID) throws Exception
        {
-               if (!this.leaderSessionID.equals(leaderSessionID)) {
-                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
-                                       + ", actual: " + leaderSessionID);
-               }
+               validateLeaderSessionId(leaderSessionID);
 
                executionGraph.scheduleOrUpdateConsumers(partitionID);
                return Acknowledge.get();
@@ -534,6 +502,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                throw new UnsupportedOperationException();
        }
 
+       // TODO: This method needs a leader session ID
        @RpcMethod
        public void acknowledgeCheckpoint(
                        final JobID jobID,
@@ -562,6 +531,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
        }
 
+       // TODO: This method needs a leader session ID
        @RpcMethod
        public void declineCheckpoint(
                        final JobID jobID,
@@ -657,10 +627,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        public Iterable<SlotOffer> offerSlots(final ResourceID taskManagerId,
                        final Iterable<SlotOffer> slots, final UUID leaderId) 
throws Exception
        {
-               if (!this.leaderSessionID.equals(leaderId)) {
-                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
-                                       + ", actual: " + leaderId);
-               }
+               validateLeaderSessionId(leaderSessionID);
 
                Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = 
registeredTaskManagers.get(taskManagerId);
                if (taskManager == null) {
@@ -689,10 +656,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        final UUID leaderId,
                        final Exception cause) throws Exception
        {
-               if (!this.leaderSessionID.equals(leaderId)) {
-                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
-                                       + ", actual: " + leaderId);
-               }
+               validateLeaderSessionId(leaderSessionID);
 
                if (!registeredTaskManagers.containsKey(taskManagerId)) {
                        throw new Exception("Unknown TaskManager " + 
taskManagerId);
@@ -782,62 +746,55 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        private void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
+               validateRunsInMainThread();
+
                final JobID jobID = executionGraph.getJobID();
                final String jobName = executionGraph.getJobName();
+
                log.info("Status of job {} ({}) changed to {}.", jobID, 
jobName, newJobStatus, error);
 
                if (newJobStatus.isGloballyTerminalState()) {
-                       // TODO set job end time in JobInfo
-
-                       /*
-                         TODO
-                         if (jobInfo.sessionAlive) {
-                jobInfo.setLastActive()
-                val lastActivity = jobInfo.lastActive
-                context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout 
seconds) {
-                  // remove only if no activity occurred in the meantime
-                  if (lastActivity == jobInfo.lastActive) {
-                    self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
-                  }
-                }(context.dispatcher)
-              } else {
-                self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
-              }
-                        */
-
-                       if (newJobStatus == JobStatus.FINISHED) {
-                               try {
-                                       final Map<String, 
SerializedValue<Object>> accumulatorResults =
-                                                       
executionGraph.getAccumulatorsSerialized();
-                                       final SerializedJobExecutionResult 
result = new SerializedJobExecutionResult(
-                                                       jobID, 0, 
accumulatorResults // TODO get correct job duration
-                                       );
-                                       
jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
-                               } catch (Exception e) {
-                                       log.error("Cannot fetch final 
accumulators for job {} ({})", jobName, jobID, e);
+                       switch (newJobStatus) {
+                               case FINISHED:
+                                       try {
+                                               // TODO get correct job duration
+                                               // job done, let's get the 
accumulators
+                                               Map<String, Object> 
accumulatorResults = executionGraph.getAccumulators();
+                                               JobExecutionResult result = new 
JobExecutionResult(jobID, 0L, accumulatorResults); 
+
+                                               
jobCompletionActions.jobFinished(result);
+                                       }
+                                       catch (Exception e) {
+                                               log.error("Cannot fetch final 
accumulators for job {} ({})", jobName, jobID, e);
+
+                                               final JobExecutionException 
exception = new JobExecutionException(jobID, 
+                                                               "Failed to 
retrieve accumulator results. " +
+                                                               "The job is 
registered as 'FINISHED (successful), but this notification describes " +
+                                                               "a failure, 
since the resulting accumulators could not be fetched.", e);
+
+                                               
jobCompletionActions.jobFailed(exception);
+                                       }
+                                       break;
+
+                               case CANCELED: {
                                        final JobExecutionException exception = 
new JobExecutionException(
-                                                       jobID, "Failed to 
retrieve accumulator results.", e);
-                                       // TODO should we also notify client?
+                                               jobID, "Job was cancelled.", 
new Exception("The job was cancelled"));
+
                                        
jobCompletionActions.jobFailed(exception);
+                                       break;
                                }
-                       } else if (newJobStatus == JobStatus.CANCELED) {
-                               final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
-                               final JobExecutionException exception = new 
JobExecutionException(
-                                               jobID, "Job was cancelled.", 
unpackedError);
-                               // TODO should we also notify client?
-                               jobCompletionActions.jobFailed(exception);
-                       } else if (newJobStatus == JobStatus.FAILED) {
-                               final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
-                               final JobExecutionException exception = new 
JobExecutionException(
-                                               jobID, "Job execution failed.", 
unpackedError);
-                               // TODO should we also notify client?
-                               jobCompletionActions.jobFailed(exception);
-                       } else {
-                               final JobExecutionException exception = new 
JobExecutionException(
-                                               jobID, newJobStatus + " is not 
a terminal state.");
-                               // TODO should we also notify client?
-                               jobCompletionActions.jobFailed(exception);
-                               throw new RuntimeException(exception);
+
+                               case FAILED: {
+                                       final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
+                                       final JobExecutionException exception = 
new JobExecutionException(
+                                                       jobID, "Job execution 
failed.", unpackedError);
+                                       
jobCompletionActions.jobFailed(exception);
+                                       break;
+                               }
+
+                               default:
+                                       // this can happen only if the enum is 
buggy
+                                       throw new 
IllegalStateException(newJobStatus.toString());
                        }
                }
        }
@@ -845,57 +802,52 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        private void notifyOfNewResourceManagerLeader(
                        final String resourceManagerAddress, final UUID 
resourceManagerLeaderId)
        {
-               // IMPORTANT: executed by main thread to avoid concurrence
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               if (resourceManagerConnection != null) {
-                                       if (resourceManagerAddress != null) {
-                                               if 
(resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
-                                                               && 
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
-                                                       // both address and 
leader id are not changed, we can keep the old connection
-                                                       return;
-                                               }
-                                               log.info("ResourceManager 
leader changed from {} to {}. Registering at new leader.",
-                                                               
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
-                                       } else {
-                                               log.info("Current 
ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
-                                                               
resourceManagerConnection.getTargetAddress());
-                                       }
-                               }
+               validateRunsInMainThread();
 
-                               closeResourceManagerConnection();
-
-                               if (resourceManagerAddress != null) {
-                                       log.info("Attempting to register at 
ResourceManager {}", resourceManagerAddress);
-                                       resourceManagerConnection = new 
ResourceManagerConnection(
-                                                       log, 
jobGraph.getJobID(), leaderSessionID,
-                                                       resourceManagerAddress, 
resourceManagerLeaderId, executionContext);
-                                       resourceManagerConnection.start();
+               if (resourceManagerConnection != null) {
+                       if (resourceManagerAddress != null) {
+                               if 
(resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
+                                               && 
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
+                                       // both address and leader id are not 
changed, we can keep the old connection
+                                       return;
                                }
+                               log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
+                                               
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+                       } else {
+                               log.info("Current ResourceManager {} lost 
leader status. Waiting for new ResourceManager leader.",
+                                               
resourceManagerConnection.getTargetAddress());
                        }
-               });
+               }
+
+               closeResourceManagerConnection();
+
+               if (resourceManagerAddress != null) {
+                       log.info("Attempting to register at ResourceManager 
{}", resourceManagerAddress);
+                       resourceManagerConnection = new 
ResourceManagerConnection(
+                                       log, jobGraph.getJobID(), getAddress(), 
leaderSessionID,
+                                       resourceManagerAddress, 
resourceManagerLeaderId, executionContext);
+                       resourceManagerConnection.start();
+               }
        }
 
        private void onResourceManagerRegistrationSuccess(final 
JobMasterRegistrationSuccess success) {
-               getRpcService().execute(new Runnable() {
-                       @Override
-                       public void run() {
-                               // TODO - add tests for comment in 
https://github.com/apache/flink/pull/2565
-                               // verify the response with current connection
-                               if (resourceManagerConnection != null
-                                               && 
resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
-                               {
-                                       log.info("JobManager successfully 
registered at ResourceManager, leader id: {}.",
-                                                       
success.getResourceManagerLeaderId());
-                                       
slotPool.setResourceManager(success.getResourceManagerLeaderId(),
-                                                       
resourceManagerConnection.getTargetGateway());
-                               }
-                       }
-               });
+               validateRunsInMainThread();
+       
+               // verify the response with current connection
+               if (resourceManagerConnection != null
+                               && 
resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
+               {
+                       log.info("JobManager successfully registered at 
ResourceManager, leader id: {}.",
+                                       success.getResourceManagerLeaderId());
+
+                       slotPool.setResourceManager(
+                                       success.getResourceManagerLeaderId(), 
resourceManagerConnection.getTargetGateway());
+               }
        }
 
        private void closeResourceManagerConnection() {
+               validateRunsInMainThread();
+
                if (resourceManagerConnection != null) {
                        resourceManagerConnection.close();
                        resourceManagerConnection = null;
@@ -903,32 +855,49 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                slotPool.disconnectResourceManager();
        }
 
+       private void validateLeaderSessionId(UUID leaderSessionID) throws 
LeaderIdMismatchException {
+               if (this.leaderSessionID == null || 
!this.leaderSessionID.equals(leaderSessionID)) {
+                       throw new 
LeaderIdMismatchException(this.leaderSessionID, leaderSessionID);
+               }
+       }
+
        
//----------------------------------------------------------------------------------------------
        // Utility classes
        
//----------------------------------------------------------------------------------------------
 
        private class ResourceManagerLeaderListener implements 
LeaderRetrievalListener {
+
                @Override
                public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderSessionID) {
-                       notifyOfNewResourceManagerLeader(leaderAddress, 
leaderSessionID);
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       
notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+                               }
+                       });
                }
 
                @Override
                public void handleError(final Exception exception) {
-                       handleFatalError(exception);
+                       handleFatalError(new Exception("Fatal error in the 
ResourceManager leader service", exception));
                }
        }
 
+       
//----------------------------------------------------------------------------------------------
+
        private class ResourceManagerConnection
                        extends RegisteredRpcConnection<ResourceManagerGateway, 
JobMasterRegistrationSuccess>
        {
                private final JobID jobID;
 
+               private final String jobManagerRpcAddress;
+
                private final UUID jobManagerLeaderID;
 
                ResourceManagerConnection(
                                final Logger log,
                                final JobID jobID,
+                               final String jobManagerRpcAddress,
                                final UUID jobManagerLeaderID,
                                final String resourceManagerAddress,
                                final UUID resourceManagerLeaderID,
@@ -936,6 +905,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                {
                        super(log, resourceManagerAddress, 
resourceManagerLeaderID, executor);
                        this.jobID = checkNotNull(jobID);
+                       this.jobManagerRpcAddress = 
checkNotNull(jobManagerRpcAddress);
                        this.jobManagerLeaderID = 
checkNotNull(jobManagerLeaderID);
                }
 
@@ -946,18 +916,29 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                        getTargetAddress(), getTargetLeaderId())
                        {
                                @Override
-                               protected Future<RegistrationResponse> 
invokeRegistration(ResourceManagerGateway gateway, UUID leaderId,
-                                               long timeoutMillis) throws 
Exception
+                               protected Future<RegistrationResponse> 
invokeRegistration(
+                                               ResourceManagerGateway gateway, 
UUID leaderId, long timeoutMillis) throws Exception
                                {
                                        Time timeout = 
Time.milliseconds(timeoutMillis);
-                                       return 
gateway.registerJobManager(leaderId, jobManagerLeaderID, getAddress(), jobID, 
timeout);
+
+                                       return gateway.registerJobManager(
+                                               leaderId,
+                                               jobManagerLeaderID,
+                                               jobManagerRpcAddress,
+                                               jobID,
+                                               timeout);
                                }
                        };
                }
 
                @Override
                protected void onRegistrationSuccess(final 
JobMasterRegistrationSuccess success) {
-                       onResourceManagerRegistrationSuccess(success);
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       
onResourceManagerRegistrationSuccess(success);
+                               }
+                       });
                }
 
                @Override
@@ -965,4 +946,25 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        handleFatalError(failure);
                }
        }
+
+       
//----------------------------------------------------------------------------------------------
+
+       private class JobManagerJobStatusListener implements JobStatusListener {
+
+               @Override
+               public void jobStatusChanges(
+                               final JobID jobId,
+                               final JobStatus newJobStatus,
+                               final long timestamp,
+                               final Throwable error) {
+
+                       // run in rpc thread to avoid concurrency
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       jobStatusChanged(newJobStatus, 
timestamp, error);
+                               }
+                       });
+               }
+       }
 }

Reply via email to