[ 
https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15434842#comment-15434842
 ] 

ASF GitHub Bot commented on FLINK-4449:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2410#discussion_r76047656
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 ---
    @@ -94,26 +126,142 @@ public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
         * @return Slot assignment
         */
        @RpcMethod
    -   public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -           System.out.println("SlotRequest: " + slotRequest);
    -           return new SlotAssignment();
    +   public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +           return new 
AcknowledgeSlotRequest(slotRequest.getAllocationID());
        }
     
     
        /**
    +    * Register a {@link 
org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor} at the resource manager.
         *
    -    * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
    -    * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
    -    * @param resourceID               The resource ID of the TaskExecutor 
that registers
    -    *
    +    * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
    +    * @param taskExecutorAddress     The address of the TaskExecutor that 
registers
    +    * @param resourceID              The resource ID of the TaskExecutor 
that registers
         * @return The response by the ResourceManager.
         */
        @RpcMethod
    -   public org.apache.flink.runtime.rpc.registration.RegistrationResponse 
registerTaskExecutor(
    -                   UUID resourceManagerLeaderId,
    -                   String taskExecutorAddress,
    -                   ResourceID resourceID) {
    +   public 
Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> 
registerTaskExecutor(
    +           final UUID resourceManagerLeaderId,
    +           final String taskExecutorAddress,
    +           final ResourceID resourceID)
    +   {
    +           log.info("Received taskExecutor registration with resource id 
{} from {}", resourceID, taskExecutorAddress);
    +           Future<TaskExecutorGateway> taskExecutorFuture =
    +                   getRpcService().connect(taskExecutorAddress, 
TaskExecutorGateway.class);
    +
    +           return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, 
org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +                   @Override
    +                   public 
org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(
    +                           final TaskExecutorGateway taskExecutorGateway)
    +                   {
    +                           // decline registration if resourceManager 
cannot connect to the taskExecutor using the given address
    +                           if (taskExecutorGateway == null) {
    +                                   log.warn("ResourceManager {} decline 
taskExecutor registration with resource id {} from {} because cannot connect to 
it using given address",
    +                                           getAddress(), resourceID, 
taskExecutorAddress);
    +                                   return new 
org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot 
connect to taskExecutor using given address");
    +                           } else {
    +                                   // register target taskExecutor to 
heartbeat manager
    +                                   taskExecutorGateways.put(resourceID, 
taskExecutorGateway);
    +                                   long heartbeatInterval = 
heartbeatManager.registerTarget(resourceID, taskExecutorGateway, 
taskExecutorAddress);
    +                                   return new 
TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatInterval);
    +                           }
    +                   }
    +           }, getMainThreadExecutionContext());
    +   }
    +
    +   /**
    +    * notify lost heartbeat with specified taskExecutor
    +    *
    +    * @param resourceID identify the taskManager which lost heartbeat with
    +    */
    +   void notifyLostHeartbeat(final ResourceID resourceID) {
    +           runAsync(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           TaskExecutorGateway failedTaskManager = 
taskExecutorGateways.remove(resourceID);
    +                           if (failedTaskManager != null) {
    +                                   
heartbeatManager.stopHeartbeatToTaskExecutor(resourceID);
    +                                   
failedTaskManager.markedFailed(leaderSessionID);
    +                           }
    +                   }
    +           });
    +   }
    +
    +
    +   /**
    +    * notify slotReport which is sent by taskManager to resourceManager
    +    *
    +    * @param slotReport the slot allocation report from taskManager
    +    */
    +   void handleSlotReportFromTaskManager(final SlotReport slotReport) {
    +
    +   }
    +
    +
    +   /**
    +    * callback method when current resourceManager is granted leadership
    +    *
    +    * @param newLeaderSessionID unique leadershipID
    +    */
    +   void handleGrantLeadership(final UUID newLeaderSessionID) {
    +           runAsync(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           log.info("ResourceManager {} was granted 
leadership with leader session ID {}", getAddress(), newLeaderSessionID);
    +                           leaderSessionID = newLeaderSessionID;
    +                           heartbeatManager = new 
ResourceManagerToTaskExecutorHeartbeatManager(ResourceManager.this, 
newLeaderSessionID, log);
    --- End diff --
    
    why can't the heartbeat manager always be created and only be activated 
with the new leader session ID. Then we would save some object creations.


> Heartbeat Manager between ResourceManager and TaskExecutor
> ----------------------------------------------------------
>
>                 Key: FLINK-4449
>                 URL: https://issues.apache.org/jira/browse/FLINK-4449
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: zhangjing
>            Assignee: zhangjing
>
> HeartbeatManager is responsible for heartbeat between resourceManager to 
> TaskExecutor
> 1. Register taskExecutors
> register heartbeat targets. If the heartbeat response for these targets is 
> not reported in time, mark target failed and notify resourceManager
> 2. trigger heartbeat
> trigger heartbeat from resourceManager to TaskExecutor periodically
> taskExecutor report slot allocation in the heartbeat response
> ResourceManager sync self slot allocation with the heartbeat response



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to