[FLINK-8748] [flip6] Cancel slot allocations for alternatively completed slot requests
If a slot request is fulfilled with a different AllocatedSlot in the SlotPool, then we cancel the slot request sent to the ResourceManager. This closes #5561. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/107c8e04 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/107c8e04 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/107c8e04 Branch: refs/heads/master Commit: 107c8e04be86e9fd893a5c9e0f9c528d1453c3de Parents: e04639d Author: Till Rohrmann <[email protected]> Authored: Thu Feb 22 14:10:29 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sat Feb 24 15:04:55 2018 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionGraph.java | 2 +- .../runtime/jobmaster/slotpool/SlotPool.java | 21 +++++++++++++------- .../jobmaster/slotpool/SlotSharingManager.java | 4 ++-- .../slotmanager/SlotManager.java | 4 +++- 4 files changed, 20 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/107c8e04/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 9313466..3d69d71 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -993,7 +993,7 @@ public class ExecutionGraph implements AccessExecutionGraph { } else { resultThrowable = strippedThrowable; } - + throw new CompletionException(resultThrowable); }); } http://git-wip-us.apache.org/repos/asf/flink/blob/107c8e04/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 6ba9e8a..ea816b5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -200,6 +200,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS @Override public CompletableFuture<Void> postStop() { + log.info("Stopping SlotPool."); // cancel all pending allocations Set<AllocationID> allocationIds = pendingRequests.keySetB(); @@ -222,6 +223,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS */ @Override public void suspend() { + log.info("Suspending SlotPool."); + validateRunsInMainThread(); // suspend this RPC endpoint @@ -338,7 +341,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS allocationTimeout); } else { multiTaskSlotLocality = allocateMultiTaskSlot( - task.getJobVertexId(), multiTaskSlotManager, + task.getJobVertexId(), + multiTaskSlotManager, resourceProfile, locationPreferences, allowQueuedScheduling, @@ -691,8 +695,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest); pendingRequest.getAllocatedSlotFuture().whenComplete( - (value, throwable) -> { - if (throwable != null) { + (AllocatedSlot allocatedSlot, Throwable throwable) -> { + if (throwable != null || allocationId.equals(allocatedSlot.getAllocationId())) { + // cancel the slot request if there is a failure or if the pending request has + // been completed with another allocated slot resourceManagerGateway.cancelSlotRequest(allocationId); } }); @@ -747,6 +753,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS @Override public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { + log.debug("Releasing slot with slot request id {}.", slotRequestId, cause); if (slotSharingGroupId != null) { final SlotSharingManager multiTaskSlotManager = slotSharingManagers.get(slotSharingGroupId); @@ -757,10 +764,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS if (taskSlot != null) { taskSlot.release(cause); } else { - log.debug("Could not find slot {} in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId, cause); + log.debug("Could not find slot {} in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId); } } else { - log.debug("Could not find slot sharing group {}. Ignoring release slot request.", slotSharingGroupId, cause); + log.debug("Could not find slot sharing group {}. Ignoring release slot request.", slotSharingGroupId); } } else { final PendingRequest pendingRequest = removePendingRequest(slotRequestId); @@ -776,7 +783,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); } } else { - log.debug("There is no allocated slot with allocation id {}. Ignoring the release slot request.", slotRequestId, cause); + log.debug("There is no allocated slot with slot request id {}. Ignoring the release slot request.", slotRequestId); } } } @@ -785,7 +792,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS } /** - * Checks whether there exists a pending request with the given allocation id and removes it + * Checks whether there exists a pending request with the given slot request id and removes it * from the internal data structures. * * @param requestId identifying the pending request http://git-wip-us.apache.org/repos/asf/flink/blob/107c8e04/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java index 91ffa8d..204a4e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java @@ -418,9 +418,9 @@ public class SlotSharingManager { private MultiTaskSlot( SlotRequestId slotRequestId, @Nullable AbstractID groupId, - MultiTaskSlot parent, + @Nullable MultiTaskSlot parent, CompletableFuture<? extends SlotContext> slotContextFuture, - SlotRequestId allocatedSlotRequestId) { + @Nullable SlotRequestId allocatedSlotRequestId) { super(slotRequestId, groupId); this.parent = parent; http://git-wip-us.apache.org/repos/asf/flink/blob/107c8e04/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index 1e6f810..f078a28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -292,11 +292,13 @@ public class SlotManager implements AutoCloseable { PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId); if (null != pendingSlotRequest) { + LOG.debug("Cancel slot request {}.", allocationId); + cancelPendingSlotRequest(pendingSlotRequest); return true; } else { - LOG.debug("No pending slot request with allocation id {} found.", allocationId); + LOG.debug("No pending slot request with allocation id {} found. Ignoring unregistration request.", allocationId); return false; }
