[ https://issues.apache.org/jira/browse/FLINK-14589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16966466#comment-16966466 ]
Till Rohrmann commented on FLINK-14589: --------------------------------------- Thanks for reporting this issue [~hwanju]. I think your analysis is spot on and it should be still a problem in Flink > 1.6. I think your solution approach is good and should solve the problem. Would you have time to open a PR against the master for it? I've assigned you to the issue. > Redundant slot requests with the same AllocationID leads to inconsistent slot > table > ----------------------------------------------------------------------------------- > > Key: FLINK-14589 > URL: https://issues.apache.org/jira/browse/FLINK-14589 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.6.3 > Reporter: Hwanju Kim > Priority: Major > > _NOTE: We found this issue in 1.6.2, but I checked the relevant code is still > in the mainline. What I am not sure, however, is what other slot-related > fixes after 1.6.2 (such as FLINK-11059 and FLINK-12863, etc) would prevent > the initial cause of this issue from happening. So far, I have not found the > related fix to the issue I am describing here, so opening this issue. Please > feel free to deduplicate this if another one already covers it. Please note > that we have already picked FLINK-9912, which turned out to be a major fix to > slot allocation failure issue. I will note the ramification to that issue > just in case others experience the same problem)._ > h2. Summary > When *requestSlot* is called from ResourceManager (RM) to TaskManager (TM), > TM firstly reserves the requested slot marking it as ALLOCATED, offers the > slot to JM, and marks the slot as ACTIVE once getting acknowledgement from > JM. This three-way communication for slot allocation is identified by > AllocationID, which is generated by JM initially. The way TM reserves a slot > is by calling *TaskSlotTable.allocateSlot* if the requested slot number > (i.e., slot index) is free to use. The major data structure is *TaskSlot* > indexed by slot index. Once the slot is marked as ALLOCATED with a given > AllocationID, it tries to update other maps such as *allocationIDTaskSlotMap* > keyed by AllocationID and *slotsPerJob* keyed by JobID. Here when updating > *allocationIDTaskSlotMap*, it's directly using > *allocationIDTaskSlotMap.put(allocationId, taskSlot)*, which may overwrite > existing entry, if one is already there with the same AllocationID. This > would render inconsistency between *TaskSlot* and *allocationIDTaskSlotMap*, > where the former says two slots are allocated by the same AllocationID and > the latter says the AllocationID only has the latest task slot. With this > state, once the slot is freed, *freeSlot* is driven by AllocationID, so it > fetches slot index (i.e., the latter one that has arrived later) from > *allocationIDTaskSlotMap*, marks the slot free, and removes it from > *allocationIDTaskSlotMap*. But still the old task slot is marked as > allocated. This old task slot becomes zombie and can never be freed. This can > cause permanent slot allocation failure if TM slots are statically and > tightly provisioned and resource manager is not actively spawning new TMs > where unavailable (e.g., Kubernetes without active mode integration, which is > not yet available). > h2. Scenario > From my observation, the redundant slot requests with the same AllocationID > and different slot indices should be rare but can happen with race condition > especially when repeated fail-over and heartbeat timeout (primarily caused by > transient resource overload, not permanent network partition/node outage) are > taking place. The following is a detailed scenario, which could lead to this > issue (AID is AllocationID): > # AID1 is requested from JM and put in the pending request queue in RM. > # RM picks up slot number 1 (Slot1) from freeSlots and performs requestSlot > with Slot1 and AID1. Here this slot request is on the fly. > # In the meantime, Slot1 is occupied by AID2 in TM for a delayed slot > request and TM sends slot report via heartbeat to RM saying Slot1 is already > allocated with AID2. > # RM's heartbeat handler identifies that Slot1 is occupied with a different > AID (AID2) so that it should reject the pending request sent from step 2. > # handleFailedSlotRequest puts the rejected AID1 to pending request again by > retrying the slot request. Now it picks up another available slot, say Slot2. > So, the retried slot request with Slot 2 and AID1 is on the fly. > # In the meantime, Slot1 occupied by AID2 is freed (by any disconnection > with JM, or releasing all the tasks in the slot on cancellation/failure - the > latter was observed). > # The in-flight slot request (Slot1, AID1) from step 2 arrives at TM, and > it's succeeded as Slot1 is free to allocate. TM offers the Slot1 to JM, which > acknowledges it so that TM marks Slot1 ACTIVE with AID1. As this point, > allocationIDTaskSlotMap[AID1] = Slot1 in TM. JM's allocatedSlots[AID1] = > Slot1. > # The next in-flight slot request (Slot2, AID1) from step 5 arrives at TM. > As Slot2 is still free, TM marks it ALLOCATED and offers Slot2 to JM and > *"overwrite allocationIDTaskSlotMap[AID1] to Slot2"* > # In step 7, JM has allocatedSlots[AID1] = Slot1, which leads JM to reject > the offer as the same AID is already occupied by another slot. > # TM gets the rejected offer for (Slot2, AID1) and frees Slot2. As part of > that, it removes allocationIDTaskSlotMap[AID1]. *Here Slot1 is still marked > as ALLOCATED with AID1 but allocationIDTaskSlotMap contains nothing for AID1.* > # From this point on, RM believes that Slot1 is allocated for AID1, so is > JM, proceeding task deployment with AID1. In TM, AID1 is not allocated at all > due to allocationIDTaskSlotMap[AID1] = null. Task deployment is failed with > *TaskSubmissionException("No task slot allocated for job ID")*. > # Any slot release from JM (by another heartbeat timeout) removes the > allocated slot (Slot1, AID1) from allocatedSlots and availableSlots, where > freeing slots with AID1 in TM is no-op due to allocationIDTaskSlotMap[AID1] = > null. > # Any further scheduling is failed with *NoResourceAvailableException("Could > not allocate all requires slots within timeout")*, unless active resource > manager is used. > h2. Repro method > The repro I have is a little stressed way than deterministic one, by having > constantly failing app (also with not reacting to cancellation to prolong > fail-over), with short heartbeat timeout (<=1s) to emulate resource overload > without imposing arbitrary resource overloads (I enabled DEBUG log, which > could also add a bit more load). Apart from repro test, the real applications > that run into this issue are heavily loaded (high slot oversubscription) > doing continuous fail-overs (by code error). > h2. Experiment > I've tested the simple solution like having *TaskSlotTable.allocateSlot* > check if *allocationIDTaskSlotMap* has an existing task slot entry for a > requested AID, it can reject allocation up front, so that the redundant slot > request can fail fast not reaching reservation and slot offer, preventing > allocationIDTaskSlotMap overwrite and inconsistent allocation tables. In the > above example, the fix can replace step 8, 9, and 10 with RM getting > *SlotAllocationException* on handling failed slot request. It may retry the > pending request via handleFailedSlotRequest, but it would stop retrying once > next heartbeat saying the pending slot was allocated by another AID or > eventually slot request timeout. My test with the fix has run for a while > hitting this race case multiple times and survived without slot allocation > failure. > As my experience may be in a bit old version (1.6.2), It would be good to > discuss what other unexpected things happened and what would be other related > issues that have been incorporated in mainline. -- This message was sent by Atlassian Jira (v8.3.4#803005)