Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77769044
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
    @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
         * RPC's main thread to avoid race condition).
         *
         * @param request The detailed request of the slot
    +    * @return SlotRequestRegistered The confirmation message to be send to 
the caller
         */
    -   public void requestSlot(final SlotRequest request) {
    +   public SlotRequestRegistered requestSlot(final SlotRequest request) {
    +           final AllocationID allocationId = request.getAllocationId();
                if (isRequestDuplicated(request)) {
    -                   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
    -                   return;
    +                   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
    +                   return null;
                }
     
                // try to fulfil the request with current free slots
    -           ResourceSlot slot = chooseSlotToUse(request, freeSlots);
    +           final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
                if (slot != null) {
                        LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
    -                           request.getAllocationId(), request.getJobId());
    +                           allocationId, request.getJobId());
     
                        // record this allocation in bookkeeping
    -                   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
    +                   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
     
                        // remove selected slot from free pool
                        freeSlots.remove(slot.getSlotId());
     
    -                   // TODO: send slot request to TaskManager
    +                   slot.getTaskExecutorGateway()
    +                           .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
    --- End diff --
    
    There exists 3 following possibilities of the response from taskExecutor:
    1. Ack request which means the taskExecutor gives the slot to the specified 
jobMaster as expected. 
    2. Decline request if the slot is already occupied by other AllocationID. 
    3. Timeout which could caused by lost of request message or response 
message or slow network transfer. 
    On the first occasion, SlotManager need to do nothing. However, under the 
second and third occasion, slotManager will verify and clear all the previous 
allocate information for this slot request firstly, then try to find a proper 
slot for the slot request again. I thought we should add logic to handle these 
3 following possibilities of the response from taskExecutor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to