Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5048#discussion_r154499247
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
    @@ -690,6 +696,21 @@ else if (availableSlots.tryRemove(allocationID)) {
                return CompletableFuture.completedFuture(Acknowledge.get());
        }
     
    +   @Override
    +   public void notifyTimeout(AllocatedSlot key, UUID ticket) {
    +           runAsync(() -> {
    +                   if 
(availableSlots.tryRemove(key.getSlotAllocationId())) {
    +                           LOG.info("Notify unused slot {} time out for 
task manager.", key.getSlotAllocationId());
    +
    +                           // What shall we do if notify failed?
    +                           // If asking timeout, the heartbeat between TM 
and RM will fix the slot leaking.
    +                           // If some unexpected exceptions happened, 
maybe retry is a better choice.
    +                           // Now, just ignore exception checking here.
    +                           
key.getTaskManagerGateway().notifySlotUnused(key.getSlotAllocationId());
    --- End diff --
    
    We should at least log if we could not notify the TM.


---

Reply via email to