Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r145948351
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
 ---
    @@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
                return allocationId;
        }
     
    -   public void setAllocationId(AllocationID allocationId) {
    -           this.allocationId = allocationId;
    -   }
    -
        public PendingSlotRequest getAssignedSlotRequest() {
                return assignedSlotRequest;
        }
     
    -   public void setAssignedSlotRequest(PendingSlotRequest 
assignedSlotRequest) {
    -           this.assignedSlotRequest = assignedSlotRequest;
    -   }
    -
        public InstanceID getInstanceId() {
                return taskManagerConnection.getInstanceID();
        }
     
    +   public void freeSlot() {
    +           Preconditions.checkState(state == State.ALLOCATED, "Slot must 
be allocated before freeing it.");
    +
    +           state = State.FREE;
    +           allocationId = null;
    +   }
    +
    +   public void clearPendingSlotRequest() {
    +           Preconditions.checkState(state == State.PENDING, "No slot 
request to clear.");
    +
    +           state = State.FREE;
    +           assignedSlotRequest = null;
    +   }
    +
    +   public void assignPendingSlotRequest(PendingSlotRequest 
pendingSlotRequest) {
    +           Preconditions.checkState(state == State.FREE, "Slot must be 
free to be assigned a slot request.");
    +
    +           state = State.PENDING;
    +           assignedSlotRequest = 
Preconditions.checkNotNull(pendingSlotRequest);
    +   }
    +
    +   public void completeAllocation(AllocationID allocationId) {
    +           Preconditions.checkState(state == State.PENDING, "In order to 
complete an allocation, the slot has to be allocated.");
    +           Preconditions.checkState(Objects.equals(allocationId, 
assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the 
pending slot request.");
    +
    +           state = State.ALLOCATED;
    +           this.allocationId = Preconditions.checkNotNull(allocationId);
    +           assignedSlotRequest = null;
    +   }
    +
    +   public void updateAllocation(AllocationID allocationId) {
    --- End diff --
    
    `updateAllocation` is used to model the state transition from `FREE` to 
`ALLOCATED`. This state transition is necessary, because the ground truth is 
hold by the `TaskExecutor`. Thus, it can be the case that the `ResourceManager` 
receives a `SlotReport` where it says that slot `x` is allocated with this 
`allocationId`. This can be the case, for example, in the RM failover case.


---

Reply via email to