[ 
https://issues.apache.org/jira/browse/FLINK-9047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409455#comment-16409455
 ] 

ASF GitHub Bot commented on FLINK-9047:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5739#discussion_r176400741
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 ---
    @@ -634,6 +653,99 @@ public void testCheckIdleSlot() throws Exception {
                }
        }
     
    +   /**
    +    * Tests that idle slots which cannot be released are only recycled if 
the owning {@link TaskExecutor}
    +    * is still registered at the {@link SlotPool}. See FLINK-9047.
    +    */
    +   @Test
    +   public void testReleasingIdleSlotFailed() throws Exception {
    +           final ManualClock clock = new ManualClock();
    +           final SlotPool slotPool = new SlotPool(
    +                   rpcService,
    +                   jobId,
    +                   clock,
    +                   TestingUtils.infiniteTime(),
    +                   timeout);
    +
    +           try {
    +                   final SlotPoolGateway slotPoolGateway = 
setupSlotPool(slotPool, resourceManagerGateway);
    +
    +                   final AllocationID expiredAllocationId = new 
AllocationID();
    +                   final SlotOffer slotToExpire = new 
SlotOffer(expiredAllocationId, 0, ResourceProfile.UNKNOWN);
    +
    +                   final ArrayDeque<CompletableFuture<Acknowledge>> 
responseQueue = new ArrayDeque<>(2);
    +                   taskManagerGateway.setFreeSlotFunction((AllocationID 
allocationId, Throwable cause) -> {
    +                           if (responseQueue.isEmpty()) {
    +                                   return 
CompletableFuture.completedFuture(Acknowledge.get());
    +                           } else {
    +                                   return responseQueue.pop();
    +                           }
    +                   });
    +
    +                   
responseQueue.add(FutureUtils.completedExceptionally(new FlinkException("Test 
failure")));
    +
    +                   final CompletableFuture<Acknowledge> responseFuture = 
new CompletableFuture<>();
    +                   responseQueue.add(responseFuture);
    +
    +                   assertThat(
    +                           
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(),
    +                           Matchers.is(Acknowledge.get()));
    +
    +                   assertThat(
    +                           slotPoolGateway.offerSlot(taskManagerLocation, 
taskManagerGateway, slotToExpire).get(),
    +                           Matchers.is(true));
    +
    +                   clock.advanceTime(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
    +
    +                   slotPool.triggerCheckIdleSlot();
    +
    +                   CompletableFuture<LogicalSlot> allocatedSlotFuture = 
slotPoolGateway.allocateSlot(
    +                           new SlotRequestId(),
    +                           new DummyScheduledUnit(),
    +                           SlotProfile.noRequirements(),
    +                           true,
    +                           timeout);
    +
    +                   // wait until the slot has been fulfilled with the 
previously idling slot
    +                   final LogicalSlot logicalSlot = 
allocatedSlotFuture.get();
    +                   assertThat(logicalSlot.getAllocationId(), 
Matchers.is(expiredAllocationId));
    +
    +                   // return the slot
    +                   
slotPool.getSlotOwner().returnAllocatedSlot(logicalSlot).get();
    +
    +                   // advance the time so that the returned slot is now 
idling
    +                   clock.advanceTime(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
    +
    +                   slotPool.triggerCheckIdleSlot();
    +
    +                   // request a new slot after the idling slot has been 
released
    +                   allocatedSlotFuture = slotPoolGateway.allocateSlot(
    +                           new SlotRequestId(),
    +                           new DummyScheduledUnit(),
    +                           SlotProfile.noRequirements(),
    +                           true,
    +                           timeout);
    +
    +                   // release the TaskExecutor before we get a response 
from the slot releasing
    +                   
slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID()).get();
    +
    +                   // let the slot releasing fail --> since there the 
owning TaskExecutor is no longer registered
    --- End diff --
    
    nit: *[...] since there the owning [...]*


> SlotPool can fail to release slots
> ----------------------------------
>
>                 Key: FLINK-9047
>                 URL: https://issues.apache.org/jira/browse/FLINK-9047
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The {{SlotPool}} releases idling slots. If the release operation fails (e.g. 
> timeout), then it simply continues using the slot. This is problematic if the 
> owning {{TaskExecutor}} failed before and was unregistered in the meantime 
> from the {{SlotPool}}. As a result, the {{SlotPool}} will reuse the slot and 
> whenever it tries to return because it is idling it will fail again. This, 
> effectively, renders the scheduling of a job impossible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to