Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145117281
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
---
@@ -556,14 +537,30 @@ private void registerSlot(
* @return True if the slot could be updated; otherwise false
*/
private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
- TaskManagerSlot slot = slots.get(slotId);
+ final TaskManagerSlot slot = slots.get(slotId);
- if (null != slot) {
- // we assume the given allocation id to be the ground
truth (coming from the TM)
- slot.setAllocationId(allocationId);
+ if (slot != null) {
+ final TaskManagerRegistration taskManagerRegistration =
taskManagerRegistrations.get(slot.getInstanceId());
+
+ if (taskManagerRegistration != null) {
+ updateSlotInternal(slot,
taskManagerRegistration, allocationId);
+
+ return true;
+ } else {
+ throw new IllegalStateException("Trying to
update a slot from a TaskManager " +
--- End diff --
same as above
---