[ 
https://issues.apache.org/jira/browse/FLINK-23216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379124#comment-17379124
 ] 

Chesnay Schepler commented on FLINK-23216:
------------------------------------------

On it's own this mechanism won't be reliable. It can occur that a slot offer is 
in-flight to the JM when it receives the message the TM is down; it will then 
accept the slot offer as usual, resulting in the same behavior.

FLINK-23209 would address this case, and actually with that ticket in mind the 
benefits of this approach would be diminished, potentially by quite a bit. 
Since split assignments and checkpoints also trigger RPCs we would detect the 
failed TM sooner than if we just relied heartbeats, and potentially quick 
enough to render this ticket unnecessary.

Considering that the exact configuration setup for FLINK-23209 is still under 
discussion, maybe we should first reach a conclusion there to see whether or 
not it could address this issue as well.

Implementation concerns:

The required information isn't readily accessible though in the 
DeclarativeSlotManager, so we'd need to beef up the tracker interfaces.

Do we need a separate RPC for that, or could we re-use 
{{JMGateway#disconnectTaskManager}}?


> RM keeps allocating and freeing slots after a TM lost until its heartbeat 
> timeout
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-23216
>                 URL: https://issues.apache.org/jira/browse/FLINK-23216
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.13.1, 1.12.4
>            Reporter: Gen Luo
>            Priority: Major
>
> In Flink 1.13, it's observed that the ResourceManager keeps allocating and 
> freeing slots with a new TM when it's notified by yarn that a TM is lost. The 
> behavior will continue until JM marks the TM as FAILED when its heartbeat 
> timeout is reached. It can be easily reproduced by enlarging the 
> akka.ask.timeout and heartbeat.timeout, for example to 10 min.
>  
> After tracking, we find the procedure should be like this:
> When a TM is killed, yarn will first receive the event and notify the RM.
> In Flink 1.13, RM uses declarative resource management to manage the slots. 
> It will find a lack of resources when receiving the notification, and then 
> request a new TM from yarn.
> RM will then require the new TM to connect and offer slots to JM.
> But from JM's point of view, all slots are fulfilled, since the lost TM is 
> not considered disconnected yet, until the heartbeat timeout is reached, so 
> JM will reject all slot offers.
> The new TM will find no slot serving for the JM, then disconnect from the JM.
> RM will then find a lack of resources again and go back to step3, requiring 
> the new TM to connect and offer slots to JM, but It won't request another new 
> TM from yarn.
>  
> The original log is lost but is like this:
> o.a.f.r.r.s.DefaultSlotStatusSyncer - Freeing slot xxx.
> ...(repeat serval lines for different slots)...
> o.a.f.r.r.s.DefaultSlotStatusSyncer - Starting allocation of slot xxx from 
> container_xxx for job xxx.
> ...(repeat serval lines for different slots)...
>  
> This could be fixed in several ways, such as notifying JM as well the RM 
> receives a TM lost notification, TMs do not offer slots until required, etc. 
> But all these ways have side effects so may need further discussion. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to