[ https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15434842#comment-15434842 ]
ASF GitHub Bot commented on FLINK-4449: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2410#discussion_r76047656 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java --- @@ -94,26 +126,142 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { * @return Slot assignment */ @RpcMethod - public SlotAssignment requestSlot(SlotRequest slotRequest) { - System.out.println("SlotRequest: " + slotRequest); - return new SlotAssignment(); + public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) { + return new AcknowledgeSlotRequest(slotRequest.getAllocationID()); } /** + * Register a {@link org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor} at the resource manager. * - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param taskExecutorAddress The address of the TaskExecutor that registers - * @param resourceID The resource ID of the TaskExecutor that registers - * + * @param resourceManagerLeaderId The fencing token for the ResourceManager leader + * @param taskExecutorAddress The address of the TaskExecutor that registers + * @param resourceID The resource ID of the TaskExecutor that registers * @return The response by the ResourceManager. */ @RpcMethod - public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor( - UUID resourceManagerLeaderId, - String taskExecutorAddress, - ResourceID resourceID) { + public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor( + final UUID resourceManagerLeaderId, + final String taskExecutorAddress, + final ResourceID resourceID) + { + log.info("Received taskExecutor registration with resource id {} from {}", resourceID, taskExecutorAddress); + Future<TaskExecutorGateway> taskExecutorFuture = + getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); + + return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() { + @Override + public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply( + final TaskExecutorGateway taskExecutorGateway) + { + // decline registration if resourceManager cannot connect to the taskExecutor using the given address + if (taskExecutorGateway == null) { + log.warn("ResourceManager {} decline taskExecutor registration with resource id {} from {} because cannot connect to it using given address", + getAddress(), resourceID, taskExecutorAddress); + return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address"); + } else { + // register target taskExecutor to heartbeat manager + taskExecutorGateways.put(resourceID, taskExecutorGateway); + long heartbeatInterval = heartbeatManager.registerTarget(resourceID, taskExecutorGateway, taskExecutorAddress); + return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatInterval); + } + } + }, getMainThreadExecutionContext()); + } + + /** + * notify lost heartbeat with specified taskExecutor + * + * @param resourceID identify the taskManager which lost heartbeat with + */ + void notifyLostHeartbeat(final ResourceID resourceID) { + runAsync(new Runnable() { + @Override + public void run() { + TaskExecutorGateway failedTaskManager = taskExecutorGateways.remove(resourceID); + if (failedTaskManager != null) { + heartbeatManager.stopHeartbeatToTaskExecutor(resourceID); + failedTaskManager.markedFailed(leaderSessionID); + } + } + }); + } + + + /** + * notify slotReport which is sent by taskManager to resourceManager + * + * @param slotReport the slot allocation report from taskManager + */ + void handleSlotReportFromTaskManager(final SlotReport slotReport) { + + } + + + /** + * callback method when current resourceManager is granted leadership + * + * @param newLeaderSessionID unique leadershipID + */ + void handleGrantLeadership(final UUID newLeaderSessionID) { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID); + leaderSessionID = newLeaderSessionID; + heartbeatManager = new ResourceManagerToTaskExecutorHeartbeatManager(ResourceManager.this, newLeaderSessionID, log); --- End diff -- why can't the heartbeat manager always be created and only be activated with the new leader session ID. Then we would save some object creations. > Heartbeat Manager between ResourceManager and TaskExecutor > ---------------------------------------------------------- > > Key: FLINK-4449 > URL: https://issues.apache.org/jira/browse/FLINK-4449 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: zhangjing > Assignee: zhangjing > > HeartbeatManager is responsible for heartbeat between resourceManager to > TaskExecutor > 1. Register taskExecutors > register heartbeat targets. If the heartbeat response for these targets is > not reported in time, mark target failed and notify resourceManager > 2. trigger heartbeat > trigger heartbeat from resourceManager to TaskExecutor periodically > taskExecutor report slot allocation in the heartbeat response > ResourceManager sync self slot allocation with the heartbeat response -- This message was sent by Atlassian JIRA (v6.3.4#6332)