[ 
https://issues.apache.org/jira/browse/FLINK-7331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16108806#comment-16108806
 ] 

ASF GitHub Bot commented on FLINK-7331:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4444#discussion_r130592145
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
    @@ -653,6 +565,130 @@ UUID getLeaderSessionId() {
        //  Internal methods
        // 
------------------------------------------------------------------------
     
    +   /**
    +    * Registers a new JobMaster.
    +    *
    +    * @param jobMasterGateway to communicate with the registering JobMaster
    +    * @param jobLeaderId leader id of the JobMaster
    +    * @param jobId of the job for which the JobMaster is responsible
    +    * @param jobManagerAddress address of the JobMaster
    +    * @param jobManagerResourceId ResourceID of the JobMaster
    +    * @return RegistrationResponse
    +    */
    +   private RegistrationResponse registerJobMasterInternal(
    +           final JobMasterGateway jobMasterGateway,
    +           UUID jobLeaderId,
    +           JobID jobId,
    +           String jobManagerAddress,
    +           ResourceID jobManagerResourceId) {
    +           if (jobManagerRegistrations.containsKey(jobId)) {
    +                   JobManagerRegistration oldJobManagerRegistration = 
jobManagerRegistrations.get(jobId);
    +
    +                   if 
(oldJobManagerRegistration.getLeaderID().equals(jobLeaderId)) {
    +                           // same registration
    +                           log.debug("Job manager {}@{} was already 
registered.", jobLeaderId, jobManagerAddress);
    +                   } else {
    +                           // tell old job manager that he is no longer 
the job leader
    +                           disconnectJobManager(
    +                                   oldJobManagerRegistration.getJobID(),
    +                                   new Exception("New job leader for job " 
+ jobId + " found."));
    +
    +                           JobManagerRegistration jobManagerRegistration = 
new JobManagerRegistration(
    +                                   jobId,
    +                                   jobManagerResourceId,
    +                                   jobLeaderId,
    +                                   jobMasterGateway);
    +                           jobManagerRegistrations.put(jobId, 
jobManagerRegistration);
    +                           
jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
    +                   }
    +           } else {
    +                   // new registration for the job
    +                   JobManagerRegistration jobManagerRegistration = new 
JobManagerRegistration(
    +                           jobId,
    +                           jobManagerResourceId,
    +                           jobLeaderId,
    +                           jobMasterGateway);
    +                   jobManagerRegistrations.put(jobId, 
jobManagerRegistration);
    +                   jmResourceIdRegistrations.put(jobManagerResourceId, 
jobManagerRegistration);
    +           }
    +
    +           log.info("Registered job manager {}@{} for job {}.", 
jobLeaderId, jobManagerAddress, jobId);
    +
    +           jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, 
new HeartbeatTarget<Void>() {
    +                   @Override
    +                   public void receiveHeartbeat(ResourceID resourceID, 
Void payload) {
    +                           // the ResourceManager will always send 
heartbeat requests to the JobManager
    +                   }
    +
    +                   @Override
    +                   public void requestHeartbeat(ResourceID resourceID, 
Void payload) {
    +                           
jobMasterGateway.heartbeatFromResourceManager(resourceID);
    +                   }
    +           });
    +
    +           return new JobMasterRegistrationSuccess(
    +                   
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
    +                   getLeaderSessionId(),
    +                   resourceId);
    +   }
    +
    +   /**
    +    * Registers a new TaskExecutor.
    +    *
    +    * @param taskExecutorGateway to communicate with the registering 
TaskExecutor
    +    * @param taskExecutorAddress address of the TaskExecutor
    +    * @param taskExecutorResourceId ResourceID of the TaskExecutor
    +    * @param slotReport initial slot report from the TaskExecutor
    +    * @return RegistrationResponse
    +    */
    +   private RegistrationResponse registerTaskExecutorInternal(
    +           TaskExecutorGateway taskExecutorGateway,
    +           String taskExecutorAddress,
    +           ResourceID taskExecutorResourceId,
    +           SlotReport slotReport) {
    +           WorkerRegistration<WorkerType> oldRegistration = 
taskExecutors.remove(taskExecutorResourceId);
    +           if (oldRegistration != null) {
    +                   // TODO :: suggest old taskExecutor to stop itself
    +                   log.info("Replacing old instance of worker for 
ResourceID {}", taskExecutorResourceId);
    +
    +                   // remove old task manager registration from slot 
manager
    +                   
slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
    +           }
    +
    +           final WorkerType newWorker = 
workerStarted(taskExecutorResourceId);
    +
    +           if(newWorker == null) {
    --- End diff --
    
    Good catch. Will fix it.


> Remove Flink's futures from ResourceManager
> -------------------------------------------
>
>                 Key: FLINK-7331
>                 URL: https://issues.apache.org/jira/browse/FLINK-7331
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Minor
>             Fix For: 1.4.0
>
>
> Remove only internally used Flink {{Futures}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to