Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5739#discussion_r176400741
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 ---
    @@ -634,6 +653,99 @@ public void testCheckIdleSlot() throws Exception {
                }
        }
     
    +   /**
    +    * Tests that idle slots which cannot be released are only recycled if 
the owning {@link TaskExecutor}
    +    * is still registered at the {@link SlotPool}. See FLINK-9047.
    +    */
    +   @Test
    +   public void testReleasingIdleSlotFailed() throws Exception {
    +           final ManualClock clock = new ManualClock();
    +           final SlotPool slotPool = new SlotPool(
    +                   rpcService,
    +                   jobId,
    +                   clock,
    +                   TestingUtils.infiniteTime(),
    +                   timeout);
    +
    +           try {
    +                   final SlotPoolGateway slotPoolGateway = 
setupSlotPool(slotPool, resourceManagerGateway);
    +
    +                   final AllocationID expiredAllocationId = new 
AllocationID();
    +                   final SlotOffer slotToExpire = new 
SlotOffer(expiredAllocationId, 0, ResourceProfile.UNKNOWN);
    +
    +                   final ArrayDeque<CompletableFuture<Acknowledge>> 
responseQueue = new ArrayDeque<>(2);
    +                   taskManagerGateway.setFreeSlotFunction((AllocationID 
allocationId, Throwable cause) -> {
    +                           if (responseQueue.isEmpty()) {
    +                                   return 
CompletableFuture.completedFuture(Acknowledge.get());
    +                           } else {
    +                                   return responseQueue.pop();
    +                           }
    +                   });
    +
    +                   
responseQueue.add(FutureUtils.completedExceptionally(new FlinkException("Test 
failure")));
    +
    +                   final CompletableFuture<Acknowledge> responseFuture = 
new CompletableFuture<>();
    +                   responseQueue.add(responseFuture);
    +
    +                   assertThat(
    +                           
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(),
    +                           Matchers.is(Acknowledge.get()));
    +
    +                   assertThat(
    +                           slotPoolGateway.offerSlot(taskManagerLocation, 
taskManagerGateway, slotToExpire).get(),
    +                           Matchers.is(true));
    +
    +                   clock.advanceTime(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
    +
    +                   slotPool.triggerCheckIdleSlot();
    +
    +                   CompletableFuture<LogicalSlot> allocatedSlotFuture = 
slotPoolGateway.allocateSlot(
    +                           new SlotRequestId(),
    +                           new DummyScheduledUnit(),
    +                           SlotProfile.noRequirements(),
    +                           true,
    +                           timeout);
    +
    +                   // wait until the slot has been fulfilled with the 
previously idling slot
    +                   final LogicalSlot logicalSlot = 
allocatedSlotFuture.get();
    +                   assertThat(logicalSlot.getAllocationId(), 
Matchers.is(expiredAllocationId));
    +
    +                   // return the slot
    +                   
slotPool.getSlotOwner().returnAllocatedSlot(logicalSlot).get();
    +
    +                   // advance the time so that the returned slot is now 
idling
    +                   clock.advanceTime(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
    +
    +                   slotPool.triggerCheckIdleSlot();
    +
    +                   // request a new slot after the idling slot has been 
released
    +                   allocatedSlotFuture = slotPoolGateway.allocateSlot(
    +                           new SlotRequestId(),
    +                           new DummyScheduledUnit(),
    +                           SlotProfile.noRequirements(),
    +                           true,
    +                           timeout);
    +
    +                   // release the TaskExecutor before we get a response 
from the slot releasing
    +                   
slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID()).get();
    +
    +                   // let the slot releasing fail --> since there the 
owning TaskExecutor is no longer registered
    --- End diff --
    
    nit: *[...] since there the owning [...]*


---

Reply via email to