This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b5c9eda9fc94b2ac25c24deae1eec119e72f12c5
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Dec 15 17:48:34 2020 +0100

    [hotfix][coordination] Ease debugging of errors during slot allocation
    
    When transitioning from PENDING to ALLOCATED we now first verify that the 
job ID is matching because this is a more serious issues.
    Furthermore, if either the slot state of job ID conditions are not met we 
now print  better error messages.
    
    Finally, log the state transitions of all slots.
---
 .../runtime/resourcemanager/slotmanager/DefaultSlotTracker.java      | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
index 131243f..8467092 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
@@ -173,8 +173,8 @@ public class DefaultSlotTracker implements SlotTracker {
 
        private void transitionSlotToAllocated(DeclarativeTaskManagerSlot slot, 
JobID jobId) {
                Preconditions.checkNotNull(slot);
-               Preconditions.checkState(slot.getState() == SlotState.PENDING);
-               Preconditions.checkState(jobId.equals(slot.getJobId()));
+               Preconditions.checkState(jobId.equals(slot.getJobId()), "Job ID 
from slot status updated (%s) does not match currently assigned job ID (%s) for 
slot %s.", jobId, slot.getJobId(), slot.getSlotId());
+               Preconditions.checkState(slot.getState() == SlotState.PENDING, 
"State of slot %s must be %s, but was %s.", slot.getSlotId(), 
SlotState.PENDING, slot.getState());
 
                slot.completeAllocation();
                slotStatusUpdateListeners.notifySlotStatusChange(slot, 
SlotState.PENDING, SlotState.ALLOCATED, jobId);
@@ -267,6 +267,7 @@ public class DefaultSlotTracker implements SlotTracker {
 
                @Override
                public void notifySlotStatusChange(TaskManagerSlotInformation 
slot, SlotState previous, SlotState current, JobID jobId) {
+                       LOG.trace("Slot {} transitioned from {} to {} for job 
{}.", slot.getSlotId(), previous, current, jobId);
                        listeners.forEach(listeners -> 
listeners.notifySlotStatusChange(slot, previous, current, jobId));
                }
        }

Reply via email to