Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4887#discussion_r148555569
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -363,6 +363,9 @@ private void
slotRequestToResourceManagerFailed(AllocationID allocationID, Throw
private void checkTimeoutSlotAllocation(AllocationID allocationID) {
PendingRequest request = pendingRequests.remove(allocationID);
if (request != null && !request.getFuture().isDone()) {
+ if (resourceManagerGateway != null) {
+ resourceManagerGateway.cancelSlotRequest(jobId,
jobMasterId, allocationID);
--- End diff --
I think we could actually add this call as exceptional callback which is
triggered by the `TimeoutException`. In `requestSlotFromResourceManager` we
could add something like `future.whenComplete((value, throwable) -> { if
(throwable instanceOf TimeoutException) {
resourceManagerGateway.cancelSlotRequest(); }});`
---