Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4887#discussion_r148553179
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
---
@@ -399,6 +399,26 @@ public void disconnectJobManager(final JobID jobId,
final Exception cause) {
}
@Override
+ public void cancelSlotRequest(JobID jobID, JobMasterId jobMasterId,
AllocationID allocationID) {
+
+ // As the slot allocations are async, it can not avoid all
redundent slots, but should best effort.
+ JobManagerRegistration jobManagerRegistration =
jobManagerRegistrations.get(jobID);
+
+ if (null != jobManagerRegistration) {
+ if (Objects.equals(jobMasterId,
jobManagerRegistration.getJobMasterId())) {
+ log.info("Cancel slot request for job {} with
allocation id {}.",
+ jobID, allocationID);
+
+ slotManager.unregisterSlotRequest(allocationID);
+ } else {
+ log.info("Job manager {} is not the leader of
job {}.", jobMasterId, jobID);
+ }
+ } else {
+ log.warn("Could not find registered job manager for job
{}.", jobID);
+ }
--- End diff --
This could be simplified to
`slotManager.unregisterSlotRequest(allocationId)` if we change
`cancelSlotRequest(JobID, JobMasterId, AllocationID)` to
`cancelSlotRequest(AllocationID)`.
---