[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430814#comment-15430814
 ] 

ASF GitHub Bot commented on FLINK-4348:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75683123
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
         * @return Slot assignment
         */
        @RpcMethod
    -   public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -           System.out.println("SlotRequest: " + slotRequest);
    -           return new SlotAssignment();
    +   public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +           // TODO slotManager.requestSlot(slotRequest);
    +           return new 
AcknowledgeSlotRequest(slotRequest.getAllocationID());
        }
     
     
        /**
    -    *
    -    * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
    -    * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
    -    * @param resourceID               The resource ID of the TaskExecutor 
that registers
    +    * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
    +    * @param taskExecutorAddress     The address of the TaskExecutor that 
registers
    +    * @param resourceID              The resource ID of the TaskExecutor 
that registers
         *
         * @return The response by the ResourceManager.
         */
        @RpcMethod
    -   public org.apache.flink.runtime.rpc.registration.RegistrationResponse 
registerTaskExecutor(
    -                   UUID resourceManagerLeaderId,
    -                   String taskExecutorAddress,
    -                   ResourceID resourceID) {
    +   public 
Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> 
registerTaskExecutor(
    +           final UUID resourceManagerLeaderId,
    +           final String taskExecutorAddress,
    +           final ResourceID resourceID
    +   ) {
    +           log.info("received register from taskExecutor {}, address {}", 
resourceID, taskExecutorAddress);
    +           Future<TaskExecutorGateway> taskExecutorFuture =
    +                   getRpcService().connect(taskExecutorAddress, 
TaskExecutorGateway.class);
    +
    +           return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, 
org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +                   @Override
    +                   public 
org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final 
TaskExecutorGateway taskExecutorGateway) {
    +                           // decline registration if resourceManager 
cannot connect to the taskExecutor using the given address
    +                           if(taskExecutorGateway == null) {
    +                                   log.warn("resourceManager {} decline 
registration from the taskExecutor {}, cannot connect to it using given address 
{} ",
    +                                           getAddress(), resourceID, 
taskExecutorAddress);
    +                                   return new 
org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot 
connect to taskExecutor using given address");
    +                           } else {
    +                                   // save the register taskExecutor 
gateway
    +                                   taskExecutorGateways.put(resourceID, 
taskExecutorGateway);
    +                                   // schedule the heartbeat with the 
registered taskExecutor
    +                                   
ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new 
ResourceManagerToTaskExecutorHeartbeatScheduler(
    +                                           ResourceManager.this, 
leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +                                   heartbeatScheduler.start();
    +                                   heartbeatSchedulers.put(resourceID, 
heartbeatScheduler);
    +                                   return new 
TaskExecutorRegistrationSuccess(new InstanceID(), 
heartbeatScheduler.getHeartbeatInterval());
    +                           }
    +                   }
    +           }, getMainThreadExecutionContext());
    +
    +   }
    +
    +   /**
    +    * notify resource failure to resourceManager, because of two reasons:
    +    * 1. cannot keep heartbeat with taskManager for several times, mark 
the resource as failed
    +    * 2. in some corner cases, TM will be marked as invalid by cluster 
manager master(e.g. yarn master), but TM itself does not realize.
    +    *
    +    * @param resourceID identify the taskManager which to stop
    +    */
    +   @RpcMethod
    +   public void notifyResourceFailure(ResourceID resourceID) {
    +           log.warn("receive failure notification of resource {}", 
resourceID);
    +           TaskExecutorGateway taskManager = 
taskExecutorGateways.get(resourceID);
    +           if(taskManager == null) {
    +                   // ignore command to stop an unregister taskManager
    +                   log.warn("ignore stop taskManager command because {} is 
unregistered", resourceID);
    +           } else {
    +                   taskExecutorGateways.remove(resourceID);
    +                   closeHeartbeatToResourceIfExist(resourceID);
    +                   // TODO notify slotManager and notify jobMaster,
    +                   // TODO 
slotManager.notifyTaskManagerFailure(resourceID);
    +                   taskManager.shutDown(leaderSessionID);
    +           }
    +   }
    +
    +   /**
    +    * close heartbeat triggers to resource if exist
    +    * @param resourceID which resource need to stop keep heartbeat with
    +    */
    +   private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
    +           if(heartbeatSchedulers.containsKey(resourceID)) {
    +                   ResourceManagerToTaskExecutorHeartbeatScheduler 
heartbeatManager = heartbeatSchedulers.get(resourceID);
    +                   heartbeatManager.close();
    +                   heartbeatSchedulers.remove(resourceID);
    +           }
    +   }
    +
    +   /**
    +    * send slotRequest to the taskManager which the given slot is on
    +    *
    +    * @param slotRequest slot request information
    +    * @param slotID      which slot is choosen
    +    */
    +   void requestSlotToTaskManager(final SlotRequest slotRequest, final 
SlotID slotID) {
    +           ResourceID resourceID = slotID.getResourceID();
    +           TaskExecutorGateway taskManager = 
taskExecutorGateways.get(resourceID);
    --- End diff --
    
    `taskManager`? Should this be renamed to `taskExecutor`?


> Implement communication from ResourceManager to TaskManager
> -----------------------------------------------------------
>
>                 Key: FLINK-4348
>                 URL: https://issues.apache.org/jira/browse/FLINK-4348
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: Kurt Young
>            Assignee: zhangjing
>
> There are mainly 3 logics initiated from RM to TM:
> * Heartbeat, RM use heartbeat to sync with TM's slot status
> * SlotRequest, when RM decides to assign slot to JM, should first try to send 
> request to TM for slot. TM can either accept or reject this request.
> * FailureNotify, in some corner cases, TM will be marked as invalid by 
> cluster manager master(e.g. yarn master), but TM itself does not realize. RM 
> should send failure notify to TM and TM can terminate itself



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to