Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/5980#discussion_r187191338
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
---
@@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() {
* the logical slot.
*
* @param cause of the payload release
- * @return true if the logical slot's payload could be released,
otherwise false
*/
@Override
- public boolean release(Throwable cause) {
- return releaseSlot(cause).isDone();
+ public void release(Throwable cause) {
+ if (STATE_UPDATER.compareAndSet(this, State.ALIVE,
State.RELEASING)) {
+ signalPayloadRelease(cause);
+ }
+ state = State.RELEASED;
+ releaseFuture.complete(null);
+ }
+
+ private CompletableFuture<?> signalPayloadRelease(Throwable cause) {
+ tryAssignPayload(TERMINATED_PAYLOAD);
+ payload.fail(cause);
+
+ return payload.getTerminalStateFuture();
+ }
+
+ private void returnSlotToOwner(CompletableFuture<?>
terminalStateFuture) {
+ final CompletableFuture<Boolean> slotReturnFuture =
terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
+ if (state == State.RELEASING) {
--- End diff --
What happens if this gets set concurrently to `RELEASED`? This would only
work if `slotOwner.returnAllocatedSlot(this)` works in both cases (releasing
and released) and the second path (returning the completed future) is the
optimization/fast path if it is already released. (double checking the
assumption).
---