[ 
https://issues.apache.org/jira/browse/FLINK-12865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16865681#comment-16865681
 ] 

Till Rohrmann commented on FLINK-12865:
---------------------------------------

Thanks for opening this issue [~gaoyunhaii].

I can see how the first problem occurs:

1. Create {{SlotReport}} with {{SlotStatus(0, free)}}
2. Receive {{TaskExecutor#requestSlot(0, allocationId)}}
3. Acknowledge requestSlot and update slot state in {{TaskManagerSlot}} to 
{{ALLOCATED}}
4. Send {{SlotReport}} with free slot to RM because it is sent from different 
thread

However, I do not fully understand the second problem. If the message is lost 
(which should actually not happen with TCP as the underlying transport), then 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java#L801
 should either time out and trigger a retry or the heartbeat report should 
complete the pending slot request.

Concerning [~xiaogang.shi] 
[comment|https://issues.apache.org/jira/browse/FLINK-12863?focusedCommentId=16865239&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16865239],
 I'm not quite sure whether I fully understand it. I think if you have a 
pending slot request assigned to a slot and you receive a {{SlotStatus}} which 
says that this slot is {{FREE}}, then you don't cancel an assigned pending slot 
request: 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java#L625.
 Maybe you can write me the exact sequence of steps to produce the problematic 
case.

Instead of adding the versioning as a solution, I was wondering whether a 
simple fix couldn't be removing the concurrency from the {{HeartbeatManager}}. 
I think this is currently causing the problems because we can generate a 
{{SlotReport}} for an earlier point of time and send it back at a later point 
in time when the state of the {{TaskExecutor}} has already changed. Executing 
all {{HeartbeatManager}} related calls in the actor's main thread should, thus, 
solve this problem.

There should also be no problem with the {{PromiseActorRef}} [~xiaogang.shi] 
because the heartbeat response will be directly sent back to the 
{{ResourceManager}}. Since we have [causal transitive 
ordering|https://doc.akka.io/docs/akka/2.5.4/scala/general/message-delivery-reliability.html]
 of messages the following sequence cannot happen:

1. Create & send {{SlotReport(SlotStatus(0, free))}}
2. Process {{requestSlot}} on TM
3. Update slot status in main thread after receiving response of 
{{requestSlot}} on RM
4. Process heartbeat response with {{SlotReport(SlotStatus(0, free))}} on RM

What still can happen is the following sequence because we receive the result 
from {{TaskExecutorGateway#requestSlot}} via a {{PromiseActorRef}}:

1. Process {{requestSlot}} on TM
2. Create & send {{SlotReport(SlotStatus(0, allocated))}} on TM
3. Process heartbeat response with {{SlotReport(SlotStatus(0, allocated))}} on 
RM
4. Update slot status in main thread after receiving response of 
{{requestSlot}} on RM

This should, however, not be a problem because this would simply complete the 
pending slot request earlier.

> State inconsistency between RM and TM on the slot status
> --------------------------------------------------------
>
>                 Key: FLINK-12865
>                 URL: https://issues.apache.org/jira/browse/FLINK-12865
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Major
>
> There may be state inconsistency between TM and RM due to race condition and 
> message loss:
>  # When TM sends heartbeat, it retrieve SlotReport in the main thread, but 
> sends the heartbeat in another thread. There may be cases that the slot on TM 
> is FREE initially and SlotReport read the FREE state, then RM requests slot 
> and mark the slot as allocated, and the SlotReport finally override the 
> allocated status at the RM side wrongly.
>  # When RM requests slot, TM received the requests but the acknowledge 
> message get lot. Then RM will think this slot is free. 
>  Both the problems may cause RM marks an ALLOCATED slot as FREE. This may 
> currently cause additional retries till the state is synchronized after the 
> next heartbeat, and for the inaccurate resource statistics for the 
> fine-grained resource management in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to