Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4444#discussion_r130548682
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
---
@@ -653,6 +565,130 @@ UUID getLeaderSessionId() {
// Internal methods
//
------------------------------------------------------------------------
+ /**
+ * Registers a new JobMaster.
+ *
+ * @param jobMasterGateway to communicate with the registering JobMaster
+ * @param jobLeaderId leader id of the JobMaster
+ * @param jobId of the job for which the JobMaster is responsible
+ * @param jobManagerAddress address of the JobMaster
+ * @param jobManagerResourceId ResourceID of the JobMaster
+ * @return RegistrationResponse
+ */
+ private RegistrationResponse registerJobMasterInternal(
+ final JobMasterGateway jobMasterGateway,
+ UUID jobLeaderId,
+ JobID jobId,
+ String jobManagerAddress,
+ ResourceID jobManagerResourceId) {
+ if (jobManagerRegistrations.containsKey(jobId)) {
+ JobManagerRegistration oldJobManagerRegistration =
jobManagerRegistrations.get(jobId);
+
+ if
(oldJobManagerRegistration.getLeaderID().equals(jobLeaderId)) {
+ // same registration
+ log.debug("Job manager {}@{} was already
registered.", jobLeaderId, jobManagerAddress);
+ } else {
+ // tell old job manager that he is no longer
the job leader
+ disconnectJobManager(
+ oldJobManagerRegistration.getJobID(),
+ new Exception("New job leader for job "
+ jobId + " found."));
+
+ JobManagerRegistration jobManagerRegistration =
new JobManagerRegistration(
+ jobId,
+ jobManagerResourceId,
+ jobLeaderId,
+ jobMasterGateway);
+ jobManagerRegistrations.put(jobId,
jobManagerRegistration);
+
jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
+ }
+ } else {
+ // new registration for the job
+ JobManagerRegistration jobManagerRegistration = new
JobManagerRegistration(
+ jobId,
+ jobManagerResourceId,
+ jobLeaderId,
+ jobMasterGateway);
+ jobManagerRegistrations.put(jobId,
jobManagerRegistration);
+ jmResourceIdRegistrations.put(jobManagerResourceId,
jobManagerRegistration);
+ }
+
+ log.info("Registered job manager {}@{} for job {}.",
jobLeaderId, jobManagerAddress, jobId);
+
+ jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId,
new HeartbeatTarget<Void>() {
+ @Override
+ public void receiveHeartbeat(ResourceID resourceID,
Void payload) {
+ // the ResourceManager will always send
heartbeat requests to the JobManager
+ }
+
+ @Override
+ public void requestHeartbeat(ResourceID resourceID,
Void payload) {
+
jobMasterGateway.heartbeatFromResourceManager(resourceID);
+ }
+ });
+
+ return new JobMasterRegistrationSuccess(
+
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
+ getLeaderSessionId(),
+ resourceId);
+ }
+
+ /**
+ * Registers a new TaskExecutor.
+ *
+ * @param taskExecutorGateway to communicate with the registering
TaskExecutor
+ * @param taskExecutorAddress address of the TaskExecutor
+ * @param taskExecutorResourceId ResourceID of the TaskExecutor
+ * @param slotReport initial slot report from the TaskExecutor
+ * @return RegistrationResponse
+ */
+ private RegistrationResponse registerTaskExecutorInternal(
+ TaskExecutorGateway taskExecutorGateway,
+ String taskExecutorAddress,
+ ResourceID taskExecutorResourceId,
+ SlotReport slotReport) {
+ WorkerRegistration<WorkerType> oldRegistration =
taskExecutors.remove(taskExecutorResourceId);
+ if (oldRegistration != null) {
+ // TODO :: suggest old taskExecutor to stop itself
+ log.info("Replacing old instance of worker for
ResourceID {}", taskExecutorResourceId);
+
+ // remove old task manager registration from slot
manager
+
slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
+ }
+
+ final WorkerType newWorker =
workerStarted(taskExecutorResourceId);
+
+ if(newWorker == null) {
--- End diff --
add space after if
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---