[
https://issues.apache.org/jira/browse/FLINK-7331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16108806#comment-16108806
]
ASF GitHub Bot commented on FLINK-7331:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4444#discussion_r130592145
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
---
@@ -653,6 +565,130 @@ UUID getLeaderSessionId() {
// Internal methods
//
------------------------------------------------------------------------
+ /**
+ * Registers a new JobMaster.
+ *
+ * @param jobMasterGateway to communicate with the registering JobMaster
+ * @param jobLeaderId leader id of the JobMaster
+ * @param jobId of the job for which the JobMaster is responsible
+ * @param jobManagerAddress address of the JobMaster
+ * @param jobManagerResourceId ResourceID of the JobMaster
+ * @return RegistrationResponse
+ */
+ private RegistrationResponse registerJobMasterInternal(
+ final JobMasterGateway jobMasterGateway,
+ UUID jobLeaderId,
+ JobID jobId,
+ String jobManagerAddress,
+ ResourceID jobManagerResourceId) {
+ if (jobManagerRegistrations.containsKey(jobId)) {
+ JobManagerRegistration oldJobManagerRegistration =
jobManagerRegistrations.get(jobId);
+
+ if
(oldJobManagerRegistration.getLeaderID().equals(jobLeaderId)) {
+ // same registration
+ log.debug("Job manager {}@{} was already
registered.", jobLeaderId, jobManagerAddress);
+ } else {
+ // tell old job manager that he is no longer
the job leader
+ disconnectJobManager(
+ oldJobManagerRegistration.getJobID(),
+ new Exception("New job leader for job "
+ jobId + " found."));
+
+ JobManagerRegistration jobManagerRegistration =
new JobManagerRegistration(
+ jobId,
+ jobManagerResourceId,
+ jobLeaderId,
+ jobMasterGateway);
+ jobManagerRegistrations.put(jobId,
jobManagerRegistration);
+
jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
+ }
+ } else {
+ // new registration for the job
+ JobManagerRegistration jobManagerRegistration = new
JobManagerRegistration(
+ jobId,
+ jobManagerResourceId,
+ jobLeaderId,
+ jobMasterGateway);
+ jobManagerRegistrations.put(jobId,
jobManagerRegistration);
+ jmResourceIdRegistrations.put(jobManagerResourceId,
jobManagerRegistration);
+ }
+
+ log.info("Registered job manager {}@{} for job {}.",
jobLeaderId, jobManagerAddress, jobId);
+
+ jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId,
new HeartbeatTarget<Void>() {
+ @Override
+ public void receiveHeartbeat(ResourceID resourceID,
Void payload) {
+ // the ResourceManager will always send
heartbeat requests to the JobManager
+ }
+
+ @Override
+ public void requestHeartbeat(ResourceID resourceID,
Void payload) {
+
jobMasterGateway.heartbeatFromResourceManager(resourceID);
+ }
+ });
+
+ return new JobMasterRegistrationSuccess(
+
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
+ getLeaderSessionId(),
+ resourceId);
+ }
+
+ /**
+ * Registers a new TaskExecutor.
+ *
+ * @param taskExecutorGateway to communicate with the registering
TaskExecutor
+ * @param taskExecutorAddress address of the TaskExecutor
+ * @param taskExecutorResourceId ResourceID of the TaskExecutor
+ * @param slotReport initial slot report from the TaskExecutor
+ * @return RegistrationResponse
+ */
+ private RegistrationResponse registerTaskExecutorInternal(
+ TaskExecutorGateway taskExecutorGateway,
+ String taskExecutorAddress,
+ ResourceID taskExecutorResourceId,
+ SlotReport slotReport) {
+ WorkerRegistration<WorkerType> oldRegistration =
taskExecutors.remove(taskExecutorResourceId);
+ if (oldRegistration != null) {
+ // TODO :: suggest old taskExecutor to stop itself
+ log.info("Replacing old instance of worker for
ResourceID {}", taskExecutorResourceId);
+
+ // remove old task manager registration from slot
manager
+
slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
+ }
+
+ final WorkerType newWorker =
workerStarted(taskExecutorResourceId);
+
+ if(newWorker == null) {
--- End diff --
Good catch. Will fix it.
> Remove Flink's futures from ResourceManager
> -------------------------------------------
>
> Key: FLINK-7331
> URL: https://issues.apache.org/jira/browse/FLINK-7331
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Minor
> Fix For: 1.4.0
>
>
> Remove only internally used Flink {{Futures}}.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)