[FLINK-8748] [flip6] Cancel slot allocations for alternatively completed slot 
requests

If a slot request is fulfilled with a different AllocatedSlot in the SlotPool,
then we cancel the slot request sent to the ResourceManager.

This closes #5561.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/107c8e04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/107c8e04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/107c8e04

Branch: refs/heads/master
Commit: 107c8e04be86e9fd893a5c9e0f9c528d1453c3de
Parents: e04639d
Author: Till Rohrmann <[email protected]>
Authored: Thu Feb 22 14:10:29 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sat Feb 24 15:04:55 2018 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  2 +-
 .../runtime/jobmaster/slotpool/SlotPool.java    | 21 +++++++++++++-------
 .../jobmaster/slotpool/SlotSharingManager.java  |  4 ++--
 .../slotmanager/SlotManager.java                |  4 +++-
 4 files changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/107c8e04/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9313466..3d69d71 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -993,7 +993,7 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                                        } else {
                                                resultThrowable = 
strippedThrowable;
                                        }
-
+                                       
                                        throw new 
CompletionException(resultThrowable);
                                });
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/107c8e04/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 6ba9e8a..ea816b5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -200,6 +200,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
 
        @Override
        public CompletableFuture<Void> postStop() {
+               log.info("Stopping SlotPool.");
                // cancel all pending allocations
                Set<AllocationID> allocationIds = pendingRequests.keySetB();
 
@@ -222,6 +223,8 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
         */
        @Override
        public void suspend() {
+               log.info("Suspending SlotPool.");
+
                validateRunsInMainThread();
 
                // suspend this RPC endpoint
@@ -338,7 +341,8 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                                allocationTimeout);
                                } else {
                                        multiTaskSlotLocality = 
allocateMultiTaskSlot(
-                                               task.getJobVertexId(), 
multiTaskSlotManager,
+                                               task.getJobVertexId(),
+                                               multiTaskSlotManager,
                                                resourceProfile,
                                                locationPreferences,
                                                allowQueuedScheduling,
@@ -691,8 +695,10 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                pendingRequests.put(pendingRequest.getSlotRequestId(), 
allocationId, pendingRequest);
 
                pendingRequest.getAllocatedSlotFuture().whenComplete(
-                       (value, throwable) -> {
-                               if (throwable != null) {
+                       (AllocatedSlot allocatedSlot, Throwable throwable) -> {
+                               if (throwable != null || 
allocationId.equals(allocatedSlot.getAllocationId())) {
+                                       // cancel the slot request if there is 
a failure or if the pending request has
+                                       // been completed with another 
allocated slot
                                        
resourceManagerGateway.cancelSlotRequest(allocationId);
                                }
                        });
@@ -747,6 +753,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
 
        @Override
        public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
+               log.debug("Releasing slot with slot request id {}.", 
slotRequestId, cause);
 
                if (slotSharingGroupId != null) {
                        final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.get(slotSharingGroupId);
@@ -757,10 +764,10 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                if (taskSlot != null) {
                                        taskSlot.release(cause);
                                } else {
-                                       log.debug("Could not find slot {} in 
slot sharing group {}. Ignoring release slot request.", slotRequestId, 
slotSharingGroupId, cause);
+                                       log.debug("Could not find slot {} in 
slot sharing group {}. Ignoring release slot request.", slotRequestId, 
slotSharingGroupId);
                                }
                        } else {
-                               log.debug("Could not find slot sharing group 
{}. Ignoring release slot request.", slotSharingGroupId, cause);
+                               log.debug("Could not find slot sharing group 
{}. Ignoring release slot request.", slotSharingGroupId);
                        }
                } else {
                        final PendingRequest pendingRequest = 
removePendingRequest(slotRequestId);
@@ -776,7 +783,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                                
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
                                        }
                                } else {
-                                       log.debug("There is no allocated slot 
with allocation id {}. Ignoring the release slot request.", slotRequestId, 
cause);
+                                       log.debug("There is no allocated slot 
with slot request id {}. Ignoring the release slot request.", slotRequestId);
                                }
                        }
                }
@@ -785,7 +792,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
        }
 
        /**
-        * Checks whether there exists a pending request with the given 
allocation id and removes it
+        * Checks whether there exists a pending request with the given slot 
request id and removes it
         * from the internal data structures.
         *
         * @param requestId identifying the pending request

http://git-wip-us.apache.org/repos/asf/flink/blob/107c8e04/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index 91ffa8d..204a4e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -418,9 +418,9 @@ public class SlotSharingManager {
                private MultiTaskSlot(
                                SlotRequestId slotRequestId,
                                @Nullable AbstractID groupId,
-                               MultiTaskSlot parent,
+                               @Nullable MultiTaskSlot parent,
                                CompletableFuture<? extends SlotContext> 
slotContextFuture,
-                               SlotRequestId allocatedSlotRequestId) {
+                               @Nullable SlotRequestId allocatedSlotRequestId) 
{
                        super(slotRequestId, groupId);
 
                        this.parent = parent;

http://git-wip-us.apache.org/repos/asf/flink/blob/107c8e04/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 1e6f810..f078a28 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -292,11 +292,13 @@ public class SlotManager implements AutoCloseable {
                PendingSlotRequest pendingSlotRequest = 
pendingSlotRequests.remove(allocationId);
 
                if (null != pendingSlotRequest) {
+                       LOG.debug("Cancel slot request {}.", allocationId);
+
                        cancelPendingSlotRequest(pendingSlotRequest);
 
                        return true;
                } else {
-                       LOG.debug("No pending slot request with allocation id 
{} found.", allocationId);
+                       LOG.debug("No pending slot request with allocation id 
{} found. Ignoring unregistration request.", allocationId);
 
                        return false;
                }

Reply via email to