[ 
https://issues.apache.org/jira/browse/FLINK-14589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-14589:
-------------------------------------

    Assignee: Till Rohrmann  (was: Hwanju Kim)

> Redundant slot requests with the same AllocationID leads to inconsistent slot 
> table
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-14589
>                 URL: https://issues.apache.org/jira/browse/FLINK-14589
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.6.3
>            Reporter: Hwanju Kim
>            Assignee: Till Rohrmann
>            Priority: Major
>             Fix For: 1.10.0, 1.8.3, 1.9.2
>
>
> _NOTE: We found this issue in 1.6.2, but I checked the relevant code is still 
> in the mainline. What I am not sure, however, is what other slot-related 
> fixes after 1.6.2 (such as FLINK-11059 and FLINK-12863, etc) would prevent 
> the initial cause of this issue from happening. So far, I have not found the 
> related fix to the issue I am describing here, so opening this issue. Please 
> feel free to deduplicate this if another one already covers it. Please note 
> that we have already picked FLINK-9912, which turned out to be a major fix to 
> slot allocation failure issue. I will note the ramification to that issue 
> just in case others experience the same problem)._
> h2. Summary
> When *requestSlot* is called from ResourceManager (RM) to TaskManager (TM), 
> TM firstly reserves the requested slot marking it as ALLOCATED, offers the 
> slot to JM, and marks the slot as ACTIVE once getting acknowledgement from 
> JM. This three-way communication for slot allocation is identified by 
> AllocationID, which is generated by JM initially. The way TM reserves a slot 
> is by calling *TaskSlotTable.allocateSlot* if the requested slot number 
> (i.e., slot index) is free to use. The major data structure is *TaskSlot* 
> indexed by slot index. Once the slot is marked as ALLOCATED with a given 
> AllocationID, it tries to update other maps such as *allocationIDTaskSlotMap* 
> keyed by AllocationID and *slotsPerJob* keyed by JobID. Here when updating 
> *allocationIDTaskSlotMap*, it's directly using 
> *allocationIDTaskSlotMap.put(allocationId, taskSlot)*, which may overwrite 
> existing entry, if one is already there with the same AllocationID. This 
> would render inconsistency between *TaskSlot* and *allocationIDTaskSlotMap*, 
> where the former says two slots are allocated by the same AllocationID and 
> the latter says the AllocationID only has the latest task slot. With this 
> state, once the slot is freed, *freeSlot* is driven by AllocationID, so it 
> fetches slot index (i.e., the latter one that has arrived later) from 
> *allocationIDTaskSlotMap*, marks the slot free, and removes it from 
> *allocationIDTaskSlotMap*. But still the old task slot is marked as 
> allocated. This old task slot becomes zombie and can never be freed. This can 
> cause permanent slot allocation failure if TM slots are statically and 
> tightly provisioned and resource manager is not actively spawning new TMs 
> where unavailable (e.g., Kubernetes without active mode integration, which is 
> not yet available).
> h2. Scenario
> From my observation, the redundant slot requests with the same AllocationID 
> and different slot indices should be rare but can happen with race condition 
> especially when repeated fail-over and heartbeat timeout (primarily caused by 
> transient resource overload, not permanent network partition/node outage) are 
> taking place. The following is a detailed scenario, which could lead to this 
> issue (AID is AllocationID):
>  # AID1 is requested from JM and put in the pending request queue in RM.
>  # RM picks up slot number 1 (Slot1) from freeSlots and performs requestSlot 
> with Slot1 and AID1. Here this slot request is on the fly.
>  # In the meantime, Slot1 is occupied by AID2 in TM for a delayed slot 
> request and TM sends slot report via heartbeat to RM saying Slot1 is already 
> allocated with AID2.
>  # RM's heartbeat handler identifies that Slot1 is occupied with a different 
> AID (AID2) so that it should reject the pending request sent from step 2.
>  # handleFailedSlotRequest puts the rejected AID1 to pending request again by 
> retrying the slot request. Now it picks up another available slot, say Slot2. 
> So, the retried slot request with Slot 2 and AID1 is on the fly.
>  # In the meantime, Slot1 occupied by AID2 is freed (by any disconnection 
> with JM, or releasing all the tasks in the slot on cancellation/failure - the 
> latter was observed).
>  # The in-flight slot request (Slot1, AID1) from step 2 arrives at TM, and 
> it's succeeded as Slot1 is free to allocate. TM offers the Slot1 to JM, which 
> acknowledges it so that TM marks Slot1 ACTIVE with AID1. As this point, 
> allocationIDTaskSlotMap[AID1] = Slot1 in TM. JM's allocatedSlots[AID1] = 
> Slot1.
>  # The next in-flight slot request (Slot2, AID1) from step 5 arrives at TM. 
> As Slot2 is still free, TM marks it ALLOCATED and offers Slot2 to JM and 
> *"overwrite allocationIDTaskSlotMap[AID1] to Slot2"*
>  # In step 7, JM has allocatedSlots[AID1] = Slot1, which leads JM to reject 
> the offer as the same AID is already occupied by another slot.
>  # TM gets the rejected offer for (Slot2, AID1) and frees Slot2. As part of 
> that, it removes allocationIDTaskSlotMap[AID1]. *Here Slot1 is still marked 
> as ALLOCATED with AID1 but allocationIDTaskSlotMap contains nothing for AID1.*
>  # From this point on, RM believes that Slot1 is allocated for AID1, so is 
> JM, proceeding task deployment with AID1. In TM, AID1 is not allocated at all 
> due to allocationIDTaskSlotMap[AID1] = null. Task deployment is failed with 
> *TaskSubmissionException("No task slot allocated for job ID")*.
>  # Any slot release from JM (by another heartbeat timeout) removes the 
> allocated slot (Slot1, AID1) from allocatedSlots and availableSlots, where 
> freeing slots with AID1 in TM is no-op due to allocationIDTaskSlotMap[AID1] = 
> null.
>  # Any further scheduling is failed with *NoResourceAvailableException("Could 
> not allocate all requires slots within timeout")*, unless active resource 
> manager is used.
> h2. Repro method
> The repro I have is a little stressed way than deterministic one, by having 
> constantly failing app (also with not reacting to cancellation to prolong 
> fail-over), with short heartbeat timeout (<=1s) to emulate resource overload 
> without imposing arbitrary resource overloads (I enabled DEBUG log, which 
> could also add a bit more load). Apart from repro test, the real applications 
> that run into this issue are heavily loaded (high slot oversubscription) 
> doing continuous fail-overs (by code error).
> h2. Experiment
> I've tested the simple solution like having *TaskSlotTable.allocateSlot* 
> check if *allocationIDTaskSlotMap* has an existing task slot entry for a 
> requested AID, it can reject allocation up front, so that the redundant slot 
> request can fail fast not reaching reservation and slot offer, preventing 
> allocationIDTaskSlotMap overwrite and inconsistent allocation tables. In the 
> above example, the fix can replace step 8, 9, and 10 with RM getting 
> *SlotAllocationException* on handling failed slot request. It may retry the 
> pending request via handleFailedSlotRequest, but it would stop retrying once 
> next heartbeat saying the pending slot was allocated by another AID or 
> eventually slot request timeout. My test with the fix has run for a while 
> hitting this race case multiple times and survived without slot allocation 
> failure.
> As my experience may be in a bit old version (1.6.2), It would be good to 
> discuss what other unexpected things happened and what would be other related 
> issues that have been incorporated in mainline.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to