Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r145948519
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
    @@ -418,26 +415,17 @@ public void freeSlot(SlotID slotId, AllocationID 
allocationId) {
                TaskManagerSlot slot = slots.get(slotId);
     
                if (null != slot) {
    -                   if (slot.isAllocated()) {
    +                   if (slot.getState() == TaskManagerSlot.State.ALLOCATED) 
{
                                if (Objects.equals(allocationId, 
slot.getAllocationId())) {
    -                                   // free the slot
    -                                   slot.setAllocationId(null);
    -                                   
fulfilledSlotRequests.remove(allocationId);
    -
    -                                   if (slot.isFree()) {
    -                                           handleFreeSlot(slot);
    -                                   }
     
                                        TaskManagerRegistration 
taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
     
    -                                   if (null != taskManagerRegistration) {
    -                                           if 
(anySlotUsed(taskManagerRegistration.getSlots())) {
    -                                                   
taskManagerRegistration.markUsed();
    -                                           } else {
    -                                                   
taskManagerRegistration.markIdle();
    -                                           }
    +                                   if (taskManagerRegistration == null) {
    +                                           throw new 
IllegalStateException("Trying to free a slot from a TaskManager " +
    --- End diff --
    
    Yes, will add it.


---

Reply via email to