Repository: flink
Updated Branches:
  refs/heads/master 2d19d1100 -> 445cdfd57


[FLINK-8934] [flip6] Properly cancel slot requests of otherwisely fulfilled 
requests

Cancel slot requests at the ResourceManager if they have been completed with a 
different
allocation.

This closes #5687.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/445cdfd5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/445cdfd5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/445cdfd5

Branch: refs/heads/master
Commit: 445cdfd57941c4c888a6e73356558fd5f11d443c
Parents: 2d19d11
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Mar 13 08:13:44 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Mar 13 18:17:54 2018 +0100

----------------------------------------------------------------------
 .../runtime/jobmaster/slotpool/SlotPool.java    |  2 +-
 .../jobmaster/slotpool/SlotPoolTest.java        | 26 ++++++++++++++------
 2 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/445cdfd5/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 8a2dd45..42264b5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -691,7 +691,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
 
                pendingRequest.getAllocatedSlotFuture().whenComplete(
                        (AllocatedSlot allocatedSlot, Throwable throwable) -> {
-                               if (throwable != null || 
allocationId.equals(allocatedSlot.getAllocationId())) {
+                               if (throwable != null || 
!allocationId.equals(allocatedSlot.getAllocationId())) {
                                        // cancel the slot request if there is 
a failure or if the pending request has
                                        // been completed with another 
allocated slot
                                        
resourceManagerGateway.cancelSlotRequest(allocationId);

http://git-wip-us.apache.org/repos/asf/flink/blob/445cdfd5/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index c529ceb..c381974 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -457,16 +457,21 @@ public class SlotPoolTest extends TestLogger {
         * Tests that unused offered slots are directly used to fulfill pending 
slot
         * requests.
         *
-        * <p>See FLINK-8089
+        * Moreover it tests that the old slot request is canceled
+        *
+        * <p>See FLINK-8089, FLINK-8934
         */
        @Test
        public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws 
Exception {
                final SlotPool slotPool = new SlotPool(rpcService, jobId);
 
-               final CompletableFuture<AllocationID> allocationIdFuture = new 
CompletableFuture<>();
+               final ArrayBlockingQueue<AllocationID> allocationIds = new 
ArrayBlockingQueue<>(2);
 
                resourceManagerGateway.setRequestSlotConsumer(
-                       (SlotRequest slotRequest) -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
+                       (SlotRequest slotRequest) -> 
allocationIds.offer(slotRequest.getAllocationId()));
+
+               final ArrayBlockingQueue<AllocationID> canceledAllocations = 
new ArrayBlockingQueue<>(2);
+               
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::offer);
 
                final SlotRequestId slotRequestId1 = new SlotRequestId();
                final SlotRequestId slotRequestId2 = new SlotRequestId();
@@ -487,7 +492,7 @@ public class SlotPoolTest extends TestLogger {
                                timeout);
 
                        // wait for the first slot request
-                       final AllocationID allocationId = 
allocationIdFuture.get();
+                       final AllocationID allocationId1 = allocationIds.take();
 
                        CompletableFuture<LogicalSlot> slotFuture2 = 
slotPoolGateway.allocateSlot(
                                slotRequestId2,
@@ -496,6 +501,9 @@ public class SlotPoolTest extends TestLogger {
                                true,
                                timeout);
 
+                       // wait for the second slot request
+                       final AllocationID allocationId2 = allocationIds.take();
+
                        slotPoolGateway.releaseSlot(slotRequestId1, null, null);
 
                        try {
@@ -505,17 +513,21 @@ public class SlotPoolTest extends TestLogger {
                        } catch (ExecutionException ee) {
                                // expected
                                
assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof 
FlinkException);
-
                        }
 
-                       final SlotOffer slotOffer = new SlotOffer(allocationId, 
0, ResourceProfile.UNKNOWN);
+                       assertEquals(allocationId1, canceledAllocations.take());
+
+                       final SlotOffer slotOffer = new 
SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
 
                        
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
 
                        
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, 
slotOffer).get());
 
                        // the slot offer should fulfill the second slot request
-                       assertEquals(allocationId, 
slotFuture2.get().getAllocationId());
+                       assertEquals(allocationId1, 
slotFuture2.get().getAllocationId());
+
+                       // check that the second slot allocation has been 
canceled
+                       assertEquals(allocationId2, canceledAllocations.take());
                } finally {
                        RpcUtils.terminateRpcEndpoint(slotPool, timeout);
                }

Reply via email to