Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5980#discussion_r187307945
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
---
@@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() {
* the logical slot.
*
* @param cause of the payload release
- * @return true if the logical slot's payload could be released,
otherwise false
*/
@Override
- public boolean release(Throwable cause) {
- return releaseSlot(cause).isDone();
+ public void release(Throwable cause) {
+ if (STATE_UPDATER.compareAndSet(this, State.ALIVE,
State.RELEASING)) {
+ signalPayloadRelease(cause);
+ }
+ state = State.RELEASED;
+ releaseFuture.complete(null);
+ }
+
+ private CompletableFuture<?> signalPayloadRelease(Throwable cause) {
+ tryAssignPayload(TERMINATED_PAYLOAD);
+ payload.fail(cause);
+
+ return payload.getTerminalStateFuture();
+ }
+
+ private void returnSlotToOwner(CompletableFuture<?>
terminalStateFuture) {
+ final CompletableFuture<Boolean> slotReturnFuture =
terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
+ if (state == State.RELEASING) {
--- End diff --
After thinking about this part, I think also a double check won't strictly
prevent this from happening. The problem is that we don't know when the
`SlotOwner` processes the return slot message. What could happen is the
following: We send the release message and see afterwards that the state is
still `RELEASING`. Before the `SlotOwner` processes the release slot message,
it will trigger the `AllocatedSlot.Payload#release` call which releases the
slot from the `SlotOwners` side. After this has happened, the owner processes
the release message. Consequently, the slot owner has to work in both cases
(`RELEASING` and `RELEASED`). With the double check we only make it less likely
to happen.
---