[
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551229#comment-15551229
]
ASF GitHub Bot commented on FLINK-4348:
---------------------------------------
Github user KurtYoung commented on a diff in the pull request:
https://github.com/apache/flink/pull/2571#discussion_r82132624
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
---
@@ -136,53 +127,34 @@ public SlotRequestRegistered requestSlot(final
SlotRequest request) {
// record this allocation in bookkeeping
allocationMap.addAllocation(slot.getSlotId(),
allocationId);
// remove selected slot from free pool
- final ResourceSlot removedSlot =
freeSlots.remove(slot.getSlotId());
-
- final Future<SlotRequestReply> slotRequestReplyFuture =
-
slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
-
- slotRequestReplyFuture.handleAsync(new
BiFunction<SlotRequestReply, Throwable, Object>() {
- @Override
- public Object apply(SlotRequestReply
slotRequestReply, Throwable throwable) {
- if (throwable != null) {
- // we failed, put the slot and
the request back again
- if
(allocationMap.isAllocated(slot.getSlotId())) {
- // only re-add if the
slot hasn't been removed in the meantime
-
freeSlots.put(slot.getSlotId(), removedSlot);
- }
-
pendingSlotRequests.put(allocationId, request);
- }
- return null;
- }
- }, resourceManagerServices.getExecutor());
+ freeSlots.remove(slot.getSlotId());
+
+ sendSlotRequest(slot, request);
} else {
LOG.info("Cannot fulfil slot request, try to allocate a
new container for it, " +
"AllocationID:{}, JobID:{}", allocationId,
request.getJobId());
- Preconditions.checkState(resourceManagerServices !=
null,
+ Preconditions.checkState(rmServices != null,
"Attempted to allocate resources but no
ResourceManagerServices set.");
-
resourceManagerServices.allocateResource(request.getResourceProfile());
+
rmServices.allocateResource(request.getResourceProfile());
pendingSlotRequests.put(allocationId, request);
}
- return new SlotRequestRegistered(allocationId);
+ return new RMSlotRequestRegistered(allocationId);
}
/**
- * Sync slot status with TaskManager's SlotReport.
+ * Notifies the SlotManager that a slot is available again after being
allocated.
+ * @param slotID slot id of available slot
*/
- public void updateSlotStatus(final SlotReport slotReport) {
- for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
- updateSlotStatus(slotStatus);
+ public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
+ if (!allocationMap.isAllocated(slotID)) {
+ throw new IllegalStateException("Slot was not
previously allocated but " +
+ "TaskManager reports it as available again");
}
- }
-
- /**
- * Registers a TaskExecutor
- * @param resourceID TaskExecutor's ResourceID
- * @param gateway TaskExcutor's gateway
- */
- public void registerTaskExecutor(ResourceID resourceID,
TaskExecutorGateway gateway) {
- this.taskManagerGateways.put(resourceID, gateway);
+ allocationMap.removeAllocation(slotID);
+ final Map<SlotID, ResourceSlot> slots =
registeredSlots.get(resourceID);
+ ResourceSlot freeSlot = slots.get(slotID);
--- End diff --
Better to check whether the slot exists in case of TM report an unknown
free slot
> Implement slot allocation protocol with TaskExecutor
> ----------------------------------------------------
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: Kurt Young
> Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,
> slotManager marks the slot as occupied, then tells the taskExecutor to give
> the slot to the specified JobMaster.
> when a slot request is sent to taskExecutor, it should contain following
> parameters: AllocationID, JobID, slotID, resourceManagerLeaderSessionID.
> There exists 3 following possibilities of the response from taskExecutor, we
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified
> jobMaster as expected.
> 2. Decline request if the slot is already occupied by other AllocationID.
> 3. Timeout which could caused by lost of request message or response message
> or slow network transfer.
> On the first occasion, ResourceManager need to do nothing. However, under the
> second and third occasion, ResourceManager need to notify slotManager,
> slotManager will verify and clear all the previous allocate information for
> this slot request firstly, then try to find a proper slot for the slot
> request again. This may cause some duplicate allocation, e.g. the slot
> request to TaskManager is successful but the response is lost somehow, so we
> may request a slot in another TaskManager, this causes two slots assigned to
> one request, but it can be taken care of by rejecting registration at
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I
> think it's better that SlotManager delegates the rpc call to ResourceManager
> when SlotManager need to communicate with outside world. ResourceManager
> know which taskExecutor to send the request based on ResourceID. Besides this
> RPC call which used to request slot to taskExecutor should not be a
> RpcMethod, because we hope only SlotManager has permission to call the
> method, but the other component, for example JobMaster and TaskExecutor,
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor
> should notify the free slot to ResourceManager immediately, or wait for next
> heartbeat sync. The advantage of first way is the resourceManager’s view
> could be updated faster. The advantage of second way is save a RPC method in
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as
> an ask operation where the response is returned as a future. Second,
> resourceManager send the slot request in fire and forget way, the response
> could be returned by an RPC call. I prefer the first one because it is more
> simple and could save a RPC method in ResourceManager (for callback in the
> second way).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)