[
https://issues.apache.org/jira/browse/FLINK-9324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16470247#comment-16470247
]
ASF GitHub Bot commented on FLINK-9324:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5980#discussion_r187305983
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
---
@@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() {
* the logical slot.
*
* @param cause of the payload release
- * @return true if the logical slot's payload could be released,
otherwise false
*/
@Override
- public boolean release(Throwable cause) {
- return releaseSlot(cause).isDone();
+ public void release(Throwable cause) {
+ if (STATE_UPDATER.compareAndSet(this, State.ALIVE,
State.RELEASING)) {
+ signalPayloadRelease(cause);
+ }
+ state = State.RELEASED;
+ releaseFuture.complete(null);
+ }
+
+ private CompletableFuture<?> signalPayloadRelease(Throwable cause) {
+ tryAssignPayload(TERMINATED_PAYLOAD);
+ payload.fail(cause);
+
+ return payload.getTerminalStateFuture();
+ }
+
+ private void returnSlotToOwner(CompletableFuture<?>
terminalStateFuture) {
+ final CompletableFuture<Boolean> slotReturnFuture =
terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
+ if (state == State.RELEASING) {
+ return slotOwner.returnAllocatedSlot(this);
+ } else {
+ return CompletableFuture.completedFuture(true);
+ }
+ }).thenCompose(Function.identity());
+
+ slotReturnFuture.whenComplete(
+ (Object ignored, Throwable throwable) -> {
+ state = State.RELEASED;
--- End diff --
yes, this is cleaner
> SingleLogicalSlot returns completed release future before slot is properly
> returned
> -----------------------------------------------------------------------------------
>
> Key: FLINK-9324
> URL: https://issues.apache.org/jira/browse/FLINK-9324
> Project: Flink
> Issue Type: Bug
> Components: Distributed Coordination
> Affects Versions: 1.5.0, 1.6.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Blocker
> Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{SingleLogicalSlot#releaseSlot}} method returns a future which is
> completed once the slot has been returned to the {{SlotOwner}}.
> Unfortunately, we don't wait for the {{SlotOwner's}} response to complete the
> future but complete it directly after the call has been made. This causes
> that the {{ExecutionGraph}} can get restarted in case of a recovery before
> all of its slots have been returned to the {{SlotPool}}. As a consequence,
> the allocation of the new tasks might require more than the max parallelism
> because of collisions with old tasks (in case of slot sharing).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)