[ https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430811#comment-15430811 ]
ASF GitHub Bot commented on FLINK-4348: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2389#discussion_r75682769 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java --- @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { * @return Slot assignment */ @RpcMethod - public SlotAssignment requestSlot(SlotRequest slotRequest) { - System.out.println("SlotRequest: " + slotRequest); - return new SlotAssignment(); + public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) { + // TODO slotManager.requestSlot(slotRequest); + return new AcknowledgeSlotRequest(slotRequest.getAllocationID()); } /** - * - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param taskExecutorAddress The address of the TaskExecutor that registers - * @param resourceID The resource ID of the TaskExecutor that registers + * @param resourceManagerLeaderId The fencing token for the ResourceManager leader + * @param taskExecutorAddress The address of the TaskExecutor that registers + * @param resourceID The resource ID of the TaskExecutor that registers * * @return The response by the ResourceManager. */ @RpcMethod - public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor( - UUID resourceManagerLeaderId, - String taskExecutorAddress, - ResourceID resourceID) { + public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor( + final UUID resourceManagerLeaderId, + final String taskExecutorAddress, + final ResourceID resourceID + ) { + log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress); + Future<TaskExecutorGateway> taskExecutorFuture = + getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); + + return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() { + @Override + public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) { + // decline registration if resourceManager cannot connect to the taskExecutor using the given address + if(taskExecutorGateway == null) { + log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ", + getAddress(), resourceID, taskExecutorAddress); + return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address"); + } else { + // save the register taskExecutor gateway + taskExecutorGateways.put(resourceID, taskExecutorGateway); + // schedule the heartbeat with the registered taskExecutor + ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler( + ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log); + heartbeatScheduler.start(); + heartbeatSchedulers.put(resourceID, heartbeatScheduler); + return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval()); + } + } + }, getMainThreadExecutionContext()); + + } + + /** + * notify resource failure to resourceManager, because of two reasons: + * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed + * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize. + * + * @param resourceID identify the taskManager which to stop + */ + @RpcMethod + public void notifyResourceFailure(ResourceID resourceID) { + log.warn("receive failure notification of resource {}", resourceID); + TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID); + if(taskManager == null) { + // ignore command to stop an unregister taskManager + log.warn("ignore stop taskManager command because {} is unregistered", resourceID); + } else { + taskExecutorGateways.remove(resourceID); + closeHeartbeatToResourceIfExist(resourceID); + // TODO notify slotManager and notify jobMaster, + // TODO slotManager.notifyTaskManagerFailure(resourceID); + taskManager.shutDown(leaderSessionID); + } + } + + /** + * close heartbeat triggers to resource if exist + * @param resourceID which resource need to stop keep heartbeat with + */ + private void closeHeartbeatToResourceIfExist(ResourceID resourceID) { + if(heartbeatSchedulers.containsKey(resourceID)) { + ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatManager = heartbeatSchedulers.get(resourceID); + heartbeatManager.close(); + heartbeatSchedulers.remove(resourceID); + } + } + + /** + * send slotRequest to the taskManager which the given slot is on + * + * @param slotRequest slot request information + * @param slotID which slot is choosen + */ + void requestSlotToTaskManager(final SlotRequest slotRequest, final SlotID slotID) { --- End diff -- maybe the name "sendSlotRequestToTaskManager` is better > Implement communication from ResourceManager to TaskManager > ----------------------------------------------------------- > > Key: FLINK-4348 > URL: https://issues.apache.org/jira/browse/FLINK-4348 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: Kurt Young > Assignee: zhangjing > > There are mainly 3 logics initiated from RM to TM: > * Heartbeat, RM use heartbeat to sync with TM's slot status > * SlotRequest, when RM decides to assign slot to JM, should first try to send > request to TM for slot. TM can either accept or reject this request. > * FailureNotify, in some corner cases, TM will be marked as invalid by > cluster manager master(e.g. yarn master), but TM itself does not realize. RM > should send failure notify to TM and TM can terminate itself -- This message was sent by Atlassian JIRA (v6.3.4#6332)