[ 
https://issues.apache.org/jira/browse/FLINK-4535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458373#comment-15458373
 ] 

ASF GitHub Bot commented on FLINK-4535:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2451#discussion_r77337006
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 ---
    @@ -138,22 +150,46 @@ public SlotAssignment requestSlot(SlotRequest 
slotRequest) {
     
     
        /**
    -    *
    -    * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
    +    * Register a taskExecutor at the resource manager
    +    * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader
         * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
         * @param resourceID               The resource ID of the TaskExecutor 
that registers
         *
         * @return The response by the ResourceManager.
         */
        @RpcMethod
    -   public org.apache.flink.runtime.rpc.registration.RegistrationResponse 
registerTaskExecutor(
    -                   UUID resourceManagerLeaderId,
    -                   String taskExecutorAddress,
    -                   ResourceID resourceID) {
    +   public Future<RegistrationResponse> registerTaskExecutor(
    +           final UUID resourceManagerLeaderId,
    +           final String taskExecutorAddress,
    +           final ResourceID resourceID) {
    +
    +           if(!leaderSessionID.equals(resourceManagerLeaderId)) {
    +                   log.warn("Discard registration from TaskExecutor {} at 
({}) because the expected leader session ID {} did not equal the received 
leader session ID  {}",
    +                           resourceID, taskExecutorAddress, 
leaderSessionID, resourceManagerLeaderId);
    +                   return Futures.failed(new 
UnmatchedLeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
    --- End diff --
    
    Should we fail or decline the registration here? So either sending an 
exception or a `RegistrationResponse.Decline` message.


> ResourceManager registration with TaskExecutor
> ----------------------------------------------
>
>                 Key: FLINK-4535
>                 URL: https://issues.apache.org/jira/browse/FLINK-4535
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: zhangjing
>            Assignee: zhangjing
>
> When TaskExecutor register at ResourceManager, it takes the following 3 input 
> parameters:
> 1. resourceManagerLeaderId:  the fencing token for the ResourceManager leader 
> which is kept by taskExecutor who send the registration
> 2.  taskExecutorAddress: the address of taskExecutor
> 3. resourceID: The resource ID of the TaskExecutor that registers
> ResourceManager need to process the registration event based on the following 
> steps:
> 1. Check whether input resourceManagerLeaderId is as same as the current 
> leadershipSessionId of resourceManager. If not, it means that maybe two or 
> more resourceManager exists at the same time, and current resourceManager is 
> not the proper rm. so it  rejects or ignores the registration.
> 2. Check whether exists a valid taskExecutor at the giving address by 
> connecting to the address. Reject the registration from invalid address.
> 3. Check whether it is a duplicate registration by input resourceId, reject 
> the registration
> 4. Keep resourceID and taskExecutorGateway mapping relationships, And 
> optionally keep resourceID and container mapping relationships in yarn mode.
> 5. Create the connection between resourceManager and taskExecutor, and ensure 
> its healthy based on heartbeat rpc calls between rm and tm ?
> 6. Send registration successful ack to the taskExecutor.
> Discussion:
> Maybe we need import errorCode or several registration decline subclass to 
> distinguish the different causes of decline registration. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to