[ 
https://issues.apache.org/jira/browse/FLINK-9324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16470247#comment-16470247
 ] 

ASF GitHub Bot commented on FLINK-9324:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5980#discussion_r187305983
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
 ---
    @@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() {
         * the logical slot.
         *
         * @param cause of the payload release
    -    * @return true if the logical slot's payload could be released, 
otherwise false
         */
        @Override
    -   public boolean release(Throwable cause) {
    -           return releaseSlot(cause).isDone();
    +   public void release(Throwable cause) {
    +           if (STATE_UPDATER.compareAndSet(this, State.ALIVE, 
State.RELEASING)) {
    +                   signalPayloadRelease(cause);
    +           }
    +           state = State.RELEASED;
    +           releaseFuture.complete(null);
    +   }
    +
    +   private CompletableFuture<?> signalPayloadRelease(Throwable cause) {
    +           tryAssignPayload(TERMINATED_PAYLOAD);
    +           payload.fail(cause);
    +
    +           return payload.getTerminalStateFuture();
    +   }
    +
    +   private void returnSlotToOwner(CompletableFuture<?> 
terminalStateFuture) {
    +           final CompletableFuture<Boolean> slotReturnFuture = 
terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
    +                   if (state == State.RELEASING) {
    +                           return slotOwner.returnAllocatedSlot(this);
    +                   } else {
    +                           return CompletableFuture.completedFuture(true);
    +                   }
    +           }).thenCompose(Function.identity());
    +
    +           slotReturnFuture.whenComplete(
    +                   (Object ignored, Throwable throwable) -> {
    +                           state = State.RELEASED;
    --- End diff --
    
    yes, this is cleaner


> SingleLogicalSlot returns completed release future before slot is properly 
> returned
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-9324
>                 URL: https://issues.apache.org/jira/browse/FLINK-9324
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0, 1.6.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The {{SingleLogicalSlot#releaseSlot}} method returns a future which is 
> completed once the slot has been returned to the {{SlotOwner}}. 
> Unfortunately, we don't wait for the {{SlotOwner's}} response to complete the 
> future but complete it directly after the call has been made. This causes 
> that the {{ExecutionGraph}} can get restarted in case of a recovery before 
> all of its slots have been returned to the {{SlotPool}}. As a consequence, 
> the allocation of the new tasks might require more than the max parallelism 
> because of collisions with old tasks (in case of slot sharing).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to