Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145950011
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
---
@@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
return allocationId;
}
- public void setAllocationId(AllocationID allocationId) {
- this.allocationId = allocationId;
- }
-
public PendingSlotRequest getAssignedSlotRequest() {
return assignedSlotRequest;
}
- public void setAssignedSlotRequest(PendingSlotRequest
assignedSlotRequest) {
- this.assignedSlotRequest = assignedSlotRequest;
- }
-
public InstanceID getInstanceId() {
return taskManagerConnection.getInstanceID();
}
+ public void freeSlot() {
+ Preconditions.checkState(state == State.ALLOCATED, "Slot must
be allocated before freeing it.");
+
+ state = State.FREE;
+ allocationId = null;
+ }
+
+ public void clearPendingSlotRequest() {
+ Preconditions.checkState(state == State.PENDING, "No slot
request to clear.");
+
+ state = State.FREE;
+ assignedSlotRequest = null;
+ }
+
+ public void assignPendingSlotRequest(PendingSlotRequest
pendingSlotRequest) {
+ Preconditions.checkState(state == State.FREE, "Slot must be
free to be assigned a slot request.");
+
+ state = State.PENDING;
+ assignedSlotRequest =
Preconditions.checkNotNull(pendingSlotRequest);
+ }
+
+ public void completeAllocation(AllocationID allocationId) {
+ Preconditions.checkState(state == State.PENDING, "In order to
complete an allocation, the slot has to be allocated.");
+ Preconditions.checkState(Objects.equals(allocationId,
assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the
pending slot request.");
+
+ state = State.ALLOCATED;
+ this.allocationId = Preconditions.checkNotNull(allocationId);
--- End diff --
But will move it to the top to make it clearer.
---