This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b5c9eda9fc94b2ac25c24deae1eec119e72f12c5 Author: Chesnay Schepler <[email protected]> AuthorDate: Tue Dec 15 17:48:34 2020 +0100 [hotfix][coordination] Ease debugging of errors during slot allocation When transitioning from PENDING to ALLOCATED we now first verify that the job ID is matching because this is a more serious issues. Furthermore, if either the slot state of job ID conditions are not met we now print better error messages. Finally, log the state transitions of all slots. --- .../runtime/resourcemanager/slotmanager/DefaultSlotTracker.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java index 131243f..8467092 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java @@ -173,8 +173,8 @@ public class DefaultSlotTracker implements SlotTracker { private void transitionSlotToAllocated(DeclarativeTaskManagerSlot slot, JobID jobId) { Preconditions.checkNotNull(slot); - Preconditions.checkState(slot.getState() == SlotState.PENDING); - Preconditions.checkState(jobId.equals(slot.getJobId())); + Preconditions.checkState(jobId.equals(slot.getJobId()), "Job ID from slot status updated (%s) does not match currently assigned job ID (%s) for slot %s.", jobId, slot.getJobId(), slot.getSlotId()); + Preconditions.checkState(slot.getState() == SlotState.PENDING, "State of slot %s must be %s, but was %s.", slot.getSlotId(), SlotState.PENDING, slot.getState()); slot.completeAllocation(); slotStatusUpdateListeners.notifySlotStatusChange(slot, SlotState.PENDING, SlotState.ALLOCATED, jobId); @@ -267,6 +267,7 @@ public class DefaultSlotTracker implements SlotTracker { @Override public void notifySlotStatusChange(TaskManagerSlotInformation slot, SlotState previous, SlotState current, JobID jobId) { + LOG.trace("Slot {} transitioned from {} to {} for job {}.", slot.getSlotId(), previous, current, jobId); listeners.forEach(listeners -> listeners.notifySlotStatusChange(slot, previous, current, jobId)); } }
