Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145119294
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
---
@@ -556,14 +537,30 @@ private void registerSlot(
* @return True if the slot could be updated; otherwise false
*/
private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
- TaskManagerSlot slot = slots.get(slotId);
+ final TaskManagerSlot slot = slots.get(slotId);
- if (null != slot) {
- // we assume the given allocation id to be the ground
truth (coming from the TM)
- slot.setAllocationId(allocationId);
+ if (slot != null) {
+ final TaskManagerRegistration taskManagerRegistration =
taskManagerRegistrations.get(slot.getInstanceId());
+
+ if (taskManagerRegistration != null) {
+ updateSlotInternal(slot,
taskManagerRegistration, allocationId);
+
+ return true;
+ } else {
+ throw new IllegalStateException("Trying to
update a slot from a TaskManager " +
+ slot.getInstanceId() + " which has not
been registered.");
+ }
+ } else {
+ LOG.debug("Trying to update unknown slot with slot id
{}.", slotId);
- if (null != allocationId) {
- if (slot.hasPendingSlotRequest()){
+ return false;
+ }
+ }
+
+ private void updateSlotInternal(TaskManagerSlot slot,
TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID
allocationId) {
--- End diff --
isn't it odd to have an `updateSlotInternal` method, when a private
`updateSlot` method already exists? (more or less a naming issue)
---