Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5048#discussion_r154499247
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -690,6 +696,21 @@ else if (availableSlots.tryRemove(allocationID)) {
return CompletableFuture.completedFuture(Acknowledge.get());
}
+ @Override
+ public void notifyTimeout(AllocatedSlot key, UUID ticket) {
+ runAsync(() -> {
+ if
(availableSlots.tryRemove(key.getSlotAllocationId())) {
+ LOG.info("Notify unused slot {} time out for
task manager.", key.getSlotAllocationId());
+
+ // What shall we do if notify failed?
+ // If asking timeout, the heartbeat between TM
and RM will fix the slot leaking.
+ // If some unexpected exceptions happened,
maybe retry is a better choice.
+ // Now, just ignore exception checking here.
+
key.getTaskManagerGateway().notifySlotUnused(key.getSlotAllocationId());
--- End diff --
We should at least log if we could not notify the TM.
---