[
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535918#comment-15535918
]
ASF GitHub Bot commented on FLINK-4348:
---------------------------------------
Github user mxm commented on the issue:
https://github.com/apache/flink/pull/2571
@KurtYoung It is guaranteed that the ResourceManager will receive an RPC
response of some sort. Either a reply from the TaskExecutor, or a timeout/error
which is returned by the future. If the request is then retried, the
TaskExecutor receives the same request twice but will simply acknowledge it
again. The ResourceManager just keeps retrying. In the worst case, the
TaskExecutor has already freed the slot again because the JobManager doesn't
need it anymore. If the TaskExecutor then reports that the slot is available
again, we know that we can stop retrying.
This requires us to keep an extra list of unconfirmed requests to the
TaskExecutor. If the request is still unconfirmed when the slot is free again
or occupied by a different allocation, we can cancel the retrying and delete
the unconfirmed request. This is slightly more complicated than I initially
thought :)
There is one more problem thought. How to prevent a false request from the
ResourceManager to the TaskExecutor in case the ResourceManager hasn't received
a reply from the TaskExecutor but the TaskExecutor has already removed the slot
again (i.e. task has finished)? The slot would be allocated although it is not
needed anymore.
Note that sending back a current allocation list when declining a request
does not cover the case in which a slot has already been released again. The
TaskExecutor may have tried to decline a request and have failed. In the
meantime, the ResourceManager sends the same request again. This results in a
second (duplicate) slot allocation.
Again, the only solution for this problem seems to be to keep a list of
unconfirmed slot allocation removal requests at the TaskExecutor. The
ResourceManager has to acknowledge all slot allocation removals. The
TaskExecutor can then de-duplicate any requests for slots that it hasn't
received a confirmation for its removal message.
Actually, it should suffice to have only one list with unconfirmed slot
allocation removals at the TaskExecutor. The ResourceManager doesn't need a
list to filter because it relies on the TaskExecutor to filter duplicate
requests correctly.
**TL;DR**
I think we need to change the PR title š Long story short, in addition to
the proposed previously discussed changes, we need the ResourceManager to
confirm slot allocation removals by the TaskExecutor. The TaskExecutor has to
keep around previous allocation ids of freed slots to de-duplicate any old
incoming slot requests from the ResourceManager.
Thank you so much for your feedback. Please tell me if anything is unclear.
You're right that the protocol is quite complex.
> Implement slot allocation protocol with TaskExecutor
> ----------------------------------------------------
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: Kurt Young
> Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,
> slotManager marks the slot as occupied, then tells the taskExecutor to give
> the slot to the specified JobMaster.
> when a slot request is sent to taskExecutor, it should contain following
> parameters: AllocationID, JobID, slotID, resourceManagerLeaderSessionID.
> There exists 3 following possibilities of the response from taskExecutor, we
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified
> jobMaster as expected.
> 2. Decline request if the slot is already occupied by other AllocationID.
> 3. Timeout which could caused by lost of request message or response message
> or slow network transfer.
> On the first occasion, ResourceManager need to do nothing. However, under the
> second and third occasion, ResourceManager need to notify slotManager,
> slotManager will verify and clear all the previous allocate information for
> this slot request firstly, then try to find a proper slot for the slot
> request again. This may cause some duplicate allocation, e.g. the slot
> request to TaskManager is successful but the response is lost somehow, so we
> may request a slot in another TaskManager, this causes two slots assigned to
> one request, but it can be taken care of by rejecting registration at
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I
> think it's better that SlotManager delegates the rpc call to ResourceManager
> when SlotManager need to communicate with outside world. ResourceManager
> know which taskExecutor to send the request based on ResourceID. Besides this
> RPC call which used to request slot to taskExecutor should not be a
> RpcMethod, because we hope only SlotManager has permission to call the
> method, but the other component, for example JobMaster and TaskExecutor,
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor
> should notify the free slot to ResourceManager immediately, or wait for next
> heartbeat sync. The advantage of first way is the resourceManagerās view
> could be updated faster. The advantage of second way is save a RPC method in
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as
> an ask operation where the response is returned as a future. Second,
> resourceManager send the slot request in fire and forget way, the response
> could be returned by an RPC call. I prefer the first one because it is more
> simple and could save a RPC method in ResourceManager (for callback in the
> second way).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)