Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5980#discussion_r187305983
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
---
@@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() {
* the logical slot.
*
* @param cause of the payload release
- * @return true if the logical slot's payload could be released,
otherwise false
*/
@Override
- public boolean release(Throwable cause) {
- return releaseSlot(cause).isDone();
+ public void release(Throwable cause) {
+ if (STATE_UPDATER.compareAndSet(this, State.ALIVE,
State.RELEASING)) {
+ signalPayloadRelease(cause);
+ }
+ state = State.RELEASED;
+ releaseFuture.complete(null);
+ }
+
+ private CompletableFuture<?> signalPayloadRelease(Throwable cause) {
+ tryAssignPayload(TERMINATED_PAYLOAD);
+ payload.fail(cause);
+
+ return payload.getTerminalStateFuture();
+ }
+
+ private void returnSlotToOwner(CompletableFuture<?>
terminalStateFuture) {
+ final CompletableFuture<Boolean> slotReturnFuture =
terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
+ if (state == State.RELEASING) {
+ return slotOwner.returnAllocatedSlot(this);
+ } else {
+ return CompletableFuture.completedFuture(true);
+ }
+ }).thenCompose(Function.identity());
+
+ slotReturnFuture.whenComplete(
+ (Object ignored, Throwable throwable) -> {
+ state = State.RELEASED;
--- End diff --
yes, this is cleaner
---