[
https://issues.apache.org/jira/browse/FLINK-14589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Till Rohrmann updated FLINK-14589:
----------------------------------
Fix Version/s: 1.9.2
1.8.3
1.10.0
> Redundant slot requests with the same AllocationID leads to inconsistent slot
> table
> -----------------------------------------------------------------------------------
>
> Key: FLINK-14589
> URL: https://issues.apache.org/jira/browse/FLINK-14589
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.6.3
> Reporter: Hwanju Kim
> Assignee: Hwanju Kim
> Priority: Major
> Fix For: 1.10.0, 1.8.3, 1.9.2
>
>
> _NOTE: We found this issue in 1.6.2, but I checked the relevant code is still
> in the mainline. What I am not sure, however, is what other slot-related
> fixes after 1.6.2 (such as FLINK-11059 and FLINK-12863, etc) would prevent
> the initial cause of this issue from happening. So far, I have not found the
> related fix to the issue I am describing here, so opening this issue. Please
> feel free to deduplicate this if another one already covers it. Please note
> that we have already picked FLINK-9912, which turned out to be a major fix to
> slot allocation failure issue. I will note the ramification to that issue
> just in case others experience the same problem)._
> h2. Summary
> When *requestSlot* is called from ResourceManager (RM) to TaskManager (TM),
> TM firstly reserves the requested slot marking it as ALLOCATED, offers the
> slot to JM, and marks the slot as ACTIVE once getting acknowledgement from
> JM. This three-way communication for slot allocation is identified by
> AllocationID, which is generated by JM initially. The way TM reserves a slot
> is by calling *TaskSlotTable.allocateSlot* if the requested slot number
> (i.e., slot index) is free to use. The major data structure is *TaskSlot*
> indexed by slot index. Once the slot is marked as ALLOCATED with a given
> AllocationID, it tries to update other maps such as *allocationIDTaskSlotMap*
> keyed by AllocationID and *slotsPerJob* keyed by JobID. Here when updating
> *allocationIDTaskSlotMap*, it's directly using
> *allocationIDTaskSlotMap.put(allocationId, taskSlot)*, which may overwrite
> existing entry, if one is already there with the same AllocationID. This
> would render inconsistency between *TaskSlot* and *allocationIDTaskSlotMap*,
> where the former says two slots are allocated by the same AllocationID and
> the latter says the AllocationID only has the latest task slot. With this
> state, once the slot is freed, *freeSlot* is driven by AllocationID, so it
> fetches slot index (i.e., the latter one that has arrived later) from
> *allocationIDTaskSlotMap*, marks the slot free, and removes it from
> *allocationIDTaskSlotMap*. But still the old task slot is marked as
> allocated. This old task slot becomes zombie and can never be freed. This can
> cause permanent slot allocation failure if TM slots are statically and
> tightly provisioned and resource manager is not actively spawning new TMs
> where unavailable (e.g., Kubernetes without active mode integration, which is
> not yet available).
> h2. Scenario
> From my observation, the redundant slot requests with the same AllocationID
> and different slot indices should be rare but can happen with race condition
> especially when repeated fail-over and heartbeat timeout (primarily caused by
> transient resource overload, not permanent network partition/node outage) are
> taking place. The following is a detailed scenario, which could lead to this
> issue (AID is AllocationID):
> # AID1 is requested from JM and put in the pending request queue in RM.
> # RM picks up slot number 1 (Slot1) from freeSlots and performs requestSlot
> with Slot1 and AID1. Here this slot request is on the fly.
> # In the meantime, Slot1 is occupied by AID2 in TM for a delayed slot
> request and TM sends slot report via heartbeat to RM saying Slot1 is already
> allocated with AID2.
> # RM's heartbeat handler identifies that Slot1 is occupied with a different
> AID (AID2) so that it should reject the pending request sent from step 2.
> # handleFailedSlotRequest puts the rejected AID1 to pending request again by
> retrying the slot request. Now it picks up another available slot, say Slot2.
> So, the retried slot request with Slot 2 and AID1 is on the fly.
> # In the meantime, Slot1 occupied by AID2 is freed (by any disconnection
> with JM, or releasing all the tasks in the slot on cancellation/failure - the
> latter was observed).
> # The in-flight slot request (Slot1, AID1) from step 2 arrives at TM, and
> it's succeeded as Slot1 is free to allocate. TM offers the Slot1 to JM, which
> acknowledges it so that TM marks Slot1 ACTIVE with AID1. As this point,
> allocationIDTaskSlotMap[AID1] = Slot1 in TM. JM's allocatedSlots[AID1] =
> Slot1.
> # The next in-flight slot request (Slot2, AID1) from step 5 arrives at TM.
> As Slot2 is still free, TM marks it ALLOCATED and offers Slot2 to JM and
> *"overwrite allocationIDTaskSlotMap[AID1] to Slot2"*
> # In step 7, JM has allocatedSlots[AID1] = Slot1, which leads JM to reject
> the offer as the same AID is already occupied by another slot.
> # TM gets the rejected offer for (Slot2, AID1) and frees Slot2. As part of
> that, it removes allocationIDTaskSlotMap[AID1]. *Here Slot1 is still marked
> as ALLOCATED with AID1 but allocationIDTaskSlotMap contains nothing for AID1.*
> # From this point on, RM believes that Slot1 is allocated for AID1, so is
> JM, proceeding task deployment with AID1. In TM, AID1 is not allocated at all
> due to allocationIDTaskSlotMap[AID1] = null. Task deployment is failed with
> *TaskSubmissionException("No task slot allocated for job ID")*.
> # Any slot release from JM (by another heartbeat timeout) removes the
> allocated slot (Slot1, AID1) from allocatedSlots and availableSlots, where
> freeing slots with AID1 in TM is no-op due to allocationIDTaskSlotMap[AID1] =
> null.
> # Any further scheduling is failed with *NoResourceAvailableException("Could
> not allocate all requires slots within timeout")*, unless active resource
> manager is used.
> h2. Repro method
> The repro I have is a little stressed way than deterministic one, by having
> constantly failing app (also with not reacting to cancellation to prolong
> fail-over), with short heartbeat timeout (<=1s) to emulate resource overload
> without imposing arbitrary resource overloads (I enabled DEBUG log, which
> could also add a bit more load). Apart from repro test, the real applications
> that run into this issue are heavily loaded (high slot oversubscription)
> doing continuous fail-overs (by code error).
> h2. Experiment
> I've tested the simple solution like having *TaskSlotTable.allocateSlot*
> check if *allocationIDTaskSlotMap* has an existing task slot entry for a
> requested AID, it can reject allocation up front, so that the redundant slot
> request can fail fast not reaching reservation and slot offer, preventing
> allocationIDTaskSlotMap overwrite and inconsistent allocation tables. In the
> above example, the fix can replace step 8, 9, and 10 with RM getting
> *SlotAllocationException* on handling failed slot request. It may retry the
> pending request via handleFailedSlotRequest, but it would stop retrying once
> next heartbeat saying the pending slot was allocated by another AID or
> eventually slot request timeout. My test with the fix has run for a while
> hitting this race case multiple times and survived without slot allocation
> failure.
> As my experience may be in a bit old version (1.6.2), It would be good to
> discuss what other unexpected things happened and what would be other related
> issues that have been incorporated in mainline.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)