Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145948519
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
---
@@ -418,26 +415,17 @@ public void freeSlot(SlotID slotId, AllocationID
allocationId) {
TaskManagerSlot slot = slots.get(slotId);
if (null != slot) {
- if (slot.isAllocated()) {
+ if (slot.getState() == TaskManagerSlot.State.ALLOCATED)
{
if (Objects.equals(allocationId,
slot.getAllocationId())) {
- // free the slot
- slot.setAllocationId(null);
-
fulfilledSlotRequests.remove(allocationId);
-
- if (slot.isFree()) {
- handleFreeSlot(slot);
- }
TaskManagerRegistration
taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
- if (null != taskManagerRegistration) {
- if
(anySlotUsed(taskManagerRegistration.getSlots())) {
-
taskManagerRegistration.markUsed();
- } else {
-
taskManagerRegistration.markIdle();
- }
+ if (taskManagerRegistration == null) {
+ throw new
IllegalStateException("Trying to free a slot from a TaskManager " +
--- End diff --
Yes, will add it.
---