Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4937#discussion_r148596180
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -262,23 +263,36 @@ public void disconnectResourceManager() {
//
------------------------------------------------------------------------
@Override
- public CompletableFuture<SimpleSlot> allocateSlot(
- ScheduledUnit task,
+ public CompletableFuture<SimpleSlot> allocateSlot(AllocationID
allocationID,
ResourceProfile resources,
Iterable<TaskManagerLocation> locationPreferences,
Time timeout) {
- return internalAllocateSlot(task, resources,
locationPreferences);
+ return internalAllocateSlot(allocationID, resources,
locationPreferences);
}
@Override
public void returnAllocatedSlot(Slot slot) {
internalReturnAllocatedSlot(slot);
}
+ @Override
+ public void cancelSlotAllocation(AllocationID allocationID) {
+ waitingForResourceManager.remove(allocationID);
+
+ removePendingRequestWithException(allocationID, new
CancellationException("Allocation " + allocationID + " cancelled"));
+
+ if (allocatedSlots.contains(allocationID)) {
+ Slot slot = allocatedSlots.get(allocationID);
--- End diff --
We could avoid the `contains` call by simply calling `get` and then compare
against `null`.
---