Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5088#discussion_r155296898
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
---
@@ -144,6 +144,78 @@ public TaskManagerGateway getTaskManagerGateway() {
return taskManagerGateway;
}
+ /**
+ * Triggers the release of the logical slot.
+ */
+ public void triggerLogicalSlotRelease() {
+ final LogicalSlot logicalSlot = logicalSlotReference.get();
+
+ if (logicalSlot != null) {
+ logicalSlot.releaseSlot();
+ }
+ }
+
+ /**
+ * Releases the logical slot.
+ *
+ * @return true if the logical slot could be released, false otherwise.
+ */
+ public boolean releaseLogicalSlot() {
+ final LogicalSlot logicalSlot = logicalSlotReference.get();
+
+ if (logicalSlot != null) {
+ if (logicalSlot instanceof Slot) {
+ final Slot slot = (Slot) logicalSlot;
+ if (slot.markReleased()) {
+ logicalSlotReference.set(null);
+ return true;
+ }
+ } else {
+ throw new RuntimeException("Unsupported logical
slot type encounterd " + logicalSlot.getClass());
--- End diff --
Typo: * encounterd*
---