[
https://issues.apache.org/jira/browse/FLINK-5810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928747#comment-15928747
]
ASF GitHub Bot commented on FLINK-5810:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/3394#discussion_r106515494
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
---
@@ -21,519 +21,897 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
-import
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
-import
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import
org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
- * SlotManager is responsible for receiving slot requests and do slot
allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation
requests in case of there are not
- * enough available slots. Besides, it should sync its slot allocation
with TaskManager's heartbeat.
- * <p>
- * The main operation principle of SlotManager is:
- * <ul>
- * <li>1. All slot allocation status should be synced with TaskManager,
which is the ground truth.</li>
- * <li>2. All slots that have registered must be tracked, either by free
pool or allocated pool.</li>
- * <li>3. All slot requests will be handled by best efforts, there is no
guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some
special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision
based on the information it currently
- * holds.</li>
- * </ul>
- * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ * The slot manager is responsible for maintaining a view on all
registered task manager slots,
+ * their allocation and all pending slot requests. Whenever a new slot is
registered or and
+ * allocated slot is freed, then it tries to fulfill another pending slot
request. Whenever there
+ * are not enough slots available the slot manager will notify the
resource manager about it via
+ * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
+ *
+ * In order to free resources and avoid resource leaks, idling task
managers (task managers whose
+ * slots are currently not used) and not fulfilled pending slot requests
time out triggering their
+ * release and failure, respectively.
*/
-public abstract class SlotManager {
+public class SlotManager implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SlotManager.class);
+
+ /** Scheduled executor for timeouts */
+ private final ScheduledExecutor scheduledExecutor;
+
+ /** Timeout for slot requests to the task manager */
+ private final Time taskManagerRequestTimeout;
+
+ /** Timeout after which an allocation is discarded */
+ private final Time slotRequestTimeout;
+
+ /** Timeout after which an unused TaskManager is released */
+ private final Time taskManagerTimeout;
+
+ /** Map for all registered slots */
+ private final HashMap<SlotID, TaskManagerSlot> slots;
- protected final Logger LOG = LoggerFactory.getLogger(getClass());
+ /** Index of all currently free slots */
+ private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
- /** The Resource allocation provider */
- protected final ResourceManagerServices rmServices;
+ /** All currently registered task managers */
+ private final HashMap<InstanceID, TaskManagerRegistration>
taskManagerRegistrations;
- /** All registered task managers with ResourceID and gateway. */
- private final Map<ResourceID, TaskExecutorRegistration> taskManagers;
+ /** Map of fulfilled and active allocations for request deduplication
purposes */
+ private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
- /** All registered slots, including free and allocated slots */
- private final Map<ResourceID, Map<SlotID, ResourceSlot>>
registeredSlots;
+ /** Map of pending/unfulfilled slot allocation requests */
+ private final HashMap<AllocationID, PendingSlotRequest>
pendingSlotRequests;
- /** All pending slot requests, waiting available slots to fulfil */
- private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+ /** Leader id of the containing component */
+ private UUID leaderId;
- /** All free slots that can be used to be allocated */
- private final Map<SlotID, ResourceSlot> freeSlots;
+ /** Executor for future callbacks which have to be "synchronized" */
+ private Executor mainThreadExecutor;
- /** All allocations, we can lookup allocations either by SlotID or
AllocationID */
- private final AllocationMap allocationMap;
+ /** Callbacks for resource (de-)allocations */
+ private ResourceManagerActions resourceManagerActions;
- private final Time timeout;
+ /** True iff the component has been started */
+ private boolean started;
- public SlotManager(ResourceManagerServices rmServices) {
- this.rmServices = checkNotNull(rmServices);
- this.registeredSlots = new HashMap<>(16);
- this.pendingSlotRequests = new LinkedHashMap<>(16);
- this.freeSlots = new HashMap<>(16);
- this.allocationMap = new AllocationMap();
- this.taskManagers = new HashMap<>();
- this.timeout = Time.seconds(10);
+ public SlotManager(
+ ScheduledExecutor scheduledExecutor,
+ Time taskManagerRequestTimeout,
+ Time slotRequestTimeout,
+ Time taskManagerTimeout) {
+ this.scheduledExecutor =
Preconditions.checkNotNull(scheduledExecutor);
+ this.taskManagerRequestTimeout =
Preconditions.checkNotNull(taskManagerRequestTimeout);
+ this.slotRequestTimeout =
Preconditions.checkNotNull(slotRequestTimeout);
+ this.taskManagerTimeout =
Preconditions.checkNotNull(taskManagerTimeout);
+
+ slots = new HashMap<>(16);
+ freeSlots = new LinkedHashMap<>(16);
+ taskManagerRegistrations = new HashMap<>(4);
+ fulfilledSlotRequests = new HashMap<>(16);
+ pendingSlotRequests = new HashMap<>(16);
+
+ leaderId = null;
+ resourceManagerActions = null;
+ started = false;
}
- //
------------------------------------------------------------------------
- // slot managements
- //
------------------------------------------------------------------------
+ //
---------------------------------------------------------------------------------------------
+ // Component lifecycle methods
+ //
---------------------------------------------------------------------------------------------
/**
- * Request a slot with requirements, we may either fulfill the request
or pending it. Trigger container
- * allocation if we don't have enough resource. If we have free slot
which can match the request, record
- * this allocation and forward the request to TaskManager through
ResourceManager (we want this done by
- * RPC's main thread to avoid race condition).
+ * Starts the slot manager with the given leader id and resource
manager actions.
*
- * @param request The detailed request of the slot
- * @return RMSlotRequestRegistered The confirmation message to be send
to the caller
+ * @param newLeaderId to use for communication with the task managers
+ * @param newResourceManagerActions to use for resource (de-)allocations
+ */
+ public void start(UUID newLeaderId, Executor newMainThreadExecutor,
ResourceManagerActions newResourceManagerActions) {
+ leaderId = Preconditions.checkNotNull(newLeaderId);
+ mainThreadExecutor =
Preconditions.checkNotNull(newMainThreadExecutor);
+ resourceManagerActions =
Preconditions.checkNotNull(newResourceManagerActions);
+
+ started = true;
+ }
+
+ /**
+ * Suspends the component. This clears the internal state of the slot
manager.
*/
- public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
- final AllocationID allocationId = request.getAllocationId();
- if (isRequestDuplicated(request)) {
- LOG.warn("Duplicated slot request, AllocationID:{}",
allocationId);
- return new RMSlotRequestRegistered(allocationId);
+ public void suspend() {
+ for (PendingSlotRequest pendingSlotRequest :
pendingSlotRequests.values()) {
+ cancelPendingSlotRequest(pendingSlotRequest);
}
- // try to fulfil the request with current free slots
- final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
- if (slot != null) {
- LOG.info("Assigning SlotID({}) to AllocationID({}),
JobID:{}", slot.getSlotId(),
- allocationId, request.getJobId());
+ pendingSlotRequests.clear();
+
+ HashSet<InstanceID> registeredTaskManagers = new
HashSet<>(taskManagerRegistrations.keySet());
+
+ for (InstanceID registeredTaskManager : registeredTaskManagers)
{
+ unregisterTaskManager(registeredTaskManager);
+ }
+
+ leaderId = null;
+ resourceManagerActions = null;
+ started = false;
+ }
+
+ /**
+ * Closes the slot manager.
+ *
+ * @throws Exception if the close operation fails
+ */
+ @Override
+ public void close() throws Exception {
+ suspend();
+ }
+
+ //
---------------------------------------------------------------------------------------------
+ // Public API
+ //
---------------------------------------------------------------------------------------------
+
+ /**
+ * Requests a slot with the respective resource profile.
+ *
+ * @param slotRequest specifying the requested slot specs
+ * @return true if the slot request was registered; false if the
request is a duplicate
+ * @throws SlotManagerException if the slot request failed (e.g. not
enough resources left)
+ */
+ public boolean registerSlotRequest(SlotRequest slotRequest) throws
SlotManagerException {
+ checkInit();
- // record this allocation in bookkeeping
- allocationMap.addAllocation(slot.getSlotId(),
allocationId);
- // remove selected slot from free pool
- freeSlots.remove(slot.getSlotId());
+ if (checkDuplicateRequest(slotRequest.getAllocationId())) {
+ LOG.debug("Ignoring a duplicate slot request with
allocation id {}.", slotRequest.getAllocationId());
- sendSlotRequest(slot, request);
+ return false;
} else {
- LOG.info("Cannot fulfil slot request, try to allocate a
new container for it, " +
- "AllocationID:{}, JobID:{}", allocationId,
request.getJobId());
- Preconditions.checkState(rmServices != null,
- "Attempted to allocate resources but no
ResourceManagerServices set.");
-
rmServices.allocateResource(request.getResourceProfile());
- pendingSlotRequests.put(allocationId, request);
+ PendingSlotRequest pendingSlotRequest = new
PendingSlotRequest(slotRequest);
+
+ pendingSlotRequests.put(slotRequest.getAllocationId(),
pendingSlotRequest);
+
+ try {
+ internalRequestSlot(pendingSlotRequest);
+ } catch (ResourceManagerException e) {
+ // requesting the slot failed --> remove
pending slot request
+
pendingSlotRequests.remove(slotRequest.getAllocationId());
+
+ throw new SlotManagerException("Could not
fulfill slot request " + slotRequest.getAllocationId() + '.', e);
+ }
+
+ return true;
}
+ }
- return new RMSlotRequestRegistered(allocationId);
+ /**
+ * Cancels and removes a pending slot request with the given allocation
id. If there is no such
+ * pending request, then nothing is done.
+ *
+ * @param allocationId identifying the pending slot request
+ * @return True if a pending slot request was found; otherwise false
+ */
+ public boolean unregisterSlotRequest(AllocationID allocationId) {
+ checkInit();
+
+ PendingSlotRequest pendingSlotRequest =
pendingSlotRequests.remove(allocationId);
+
+ if (null != pendingSlotRequest) {
+ cancelPendingSlotRequest(pendingSlotRequest);
+
+ return true;
+ } else {
+ LOG.debug("No pending slot request with allocation id
{} found.", allocationId);
+
+ return false;
+ }
}
/**
- * Notifies the SlotManager that a slot is available again after being
allocated.
- * @param slotID slot id of available slot
+ * Registers a new task manager at the slot manager. This will make the
task managers slots
+ * known and, thus, available for allocation.
+ *
+ * @param taskExecutorConnection for the new task manager
+ * @param initialSlotReport for the new task manager
*/
- public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
- if (!allocationMap.isAllocated(slotID)) {
- throw new IllegalStateException("Slot was not
previously allocated but " +
- "TaskManager reports it as available again");
+ public void registerTaskManager(final TaskExecutorConnection
taskExecutorConnection, SlotReport initialSlotReport) {
+ checkInit();
+
+ // we identify task managers by their instance id
+ if
(!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID()))
{
+ TaskManagerRegistration taskManagerRegistration = new
TaskManagerRegistration(taskExecutorConnection);
+
taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(),
taskManagerRegistration);
}
- allocationMap.removeAllocation(slotID);
- final Map<SlotID, ResourceSlot> slots =
registeredSlots.get(resourceID);
- ResourceSlot freeSlot = slots.get(slotID);
- if (freeSlot == null) {
- throw new IllegalStateException("Slot was not
registered with SlotManager but " +
- "TaskManager reported it to be available.");
+
+ reportSlotStatus(taskExecutorConnection.getInstanceID(),
initialSlotReport);
+ }
+
+ /**
+ * Unregisters the task manager identified by the given instance id and
its associated slots
+ * from the slot manager.
+ *
+ * @param instanceId identifying the task manager to unregister
+ * @return True if there existed a registered task manager with the
given instance id
+ */
+ public boolean unregisterTaskManager(InstanceID instanceId) {
+ checkInit();
+
+ TaskManagerRegistration taskManagerRegistration =
taskManagerRegistrations.remove(instanceId);
+
+ if (null != taskManagerRegistration) {
+ removeSlots(taskManagerRegistration.getSlots());
+
+ taskManagerRegistration.cancelTimeout();
+
+ return true;
+ } else {
+ LOG.debug("There is no task manager registered with
instance ID {}. Ignoring this message.", instanceId);
+
+ return false;
}
- handleFreeSlot(freeSlot);
}
/**
- * The slot request to TaskManager may be either failed by rpc
communication (timeout, network error, etc.)
- * or really rejected by TaskManager. We shall retry this request by:
- * <ul>
- * <li>1. verify and clear all the previous allocate information for
this request
- * <li>2. try to request slot again
- * </ul>
- * <p>
- * This may cause some duplicate allocation, e.g. the slot request to
TaskManager is successful but the response
- * is lost somehow, so we may request a slot in another TaskManager,
this causes two slots assigned to one request,
- * but it can be taken care of by rejecting registration at JobManager.
+ * Reports the current slot allocations for a task manager identified
by the given instance id.
*
- * @param originalRequest The original slot request
- * @param slotId The target SlotID
+ * @param instanceId identifying the task manager for which to report
the slot status
+ * @param slotReport containing the status for all of its slots
+ * @return true if the slot status has been updated successfully,
otherwise false
*/
- void handleSlotRequestFailedAtTaskManager(final SlotRequest
originalRequest, final SlotID slotId) {
- final AllocationID originalAllocationId =
originalRequest.getAllocationId();
- LOG.info("Slot request failed at TaskManager, SlotID:{},
AllocationID:{}, JobID:{}",
- slotId, originalAllocationId,
originalRequest.getJobId());
-
- if (allocationMap.isAllocated(slotId)) {
- final AllocationID expectedAllocationId =
allocationMap.getAllocationID(slotId);
-
- // check whether we have an agreement on whom this slot
belongs to
- if (originalAllocationId.equals(expectedAllocationId)) {
- LOG.info("De-allocate this request and retry");
-
allocationMap.removeAllocation(expectedAllocationId);
-
pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest);
- ResourceSlot slot =
checkNotNull(getRegisteredSlot(slotId));
- // treat this slot as empty and retry with a
different request
- handleFreeSlot(slot);
+ public boolean reportSlotStatus(InstanceID instanceId, SlotReport
slotReport) {
+ checkInit();
+
+ TaskManagerRegistration taskManagerRegistration =
taskManagerRegistrations.get(instanceId);
+
+ if (null != taskManagerRegistration) {
+ HashSet<SlotID> slotsToRemove = new
HashSet<>(taskManagerRegistration.getSlots());
+ boolean idle = true;
+
+ for (SlotStatus slotStatus : slotReport) {
+ if
(slotsToRemove.remove(slotStatus.getSlotID())) {
+ // slot which was already registered
+ updateSlot(slotStatus.getSlotID(),
slotStatus.getAllocationID());
+ } else {
+ // new slot
+ registerSlot(
+ instanceId,
+ slotStatus.getSlotID(),
+ slotStatus.getAllocationID(),
+ slotStatus.getResourceProfile(),
+
taskManagerRegistration.getTaskManagerConnection());
+ }
+
+ TaskManagerSlot slot =
slots.get(slotStatus.getSlotID());
+
+ idle &= slot.isFree();
+ }
+
+ // remove the slots for which we haven't received a
slot status message
+ removeSlots(slotsToRemove);
+
+ if (idle) {
+ // no slot of this task manager is being used
--> register timer to free this resource
+
registerTaskManagerTimeout(taskManagerRegistration);
+ }
+
+ return true;
+ } else {
+ LOG.debug("Received slot report for unknown task
manager with instance id {}. Ignoring this report.", instanceId);
+
+ return false;
+ }
+ }
+
+ /**
+ * Free the given slot from the given allocation. If the slot is still
allocated by the given
+ * allocation id, then the slot will be marked as free and will be
subject to new slot requests.
+ *
+ * @param slotId identifying the slot to free
+ * @param allocationId with which the slot is presumably allocated
+ */
+ public void freeSlot(SlotID slotId, AllocationID allocationId) {
+ checkInit();
+
+ TaskManagerSlot slot = slots.get(slotId);
+
+ if (null != slot) {
+ if (slot.isAllocated()) {
+ if (Objects.equals(allocationId,
slot.getAllocationId())) {
+ // free the slot
+ slot.setAllocationId(null);
+
fulfilledSlotRequests.remove(allocationId);
+
+ if (slot.isFree()) {
+ handleFreeSlot(slot);
+ }
+
+ TaskManagerRegistration
taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
+
+ if (null != taskManagerRegistration &&
!anySlotUsed(taskManagerRegistration.getSlots())) {
+
registerTaskManagerTimeout(taskManagerRegistration);
+ }
+ } else {
+ LOG.debug("Received request to free
slot {} with expected allocation id {}, " +
+ "but actual allocation id {}
differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId());
+ }
} else {
- LOG.error("Slot request failed for slot {} with
allocation id {}:" +
- " Allocation id did not match
the expected allocation id {}.",
- slotId, originalAllocationId,
expectedAllocationId);
+ LOG.debug("Slot {} has not been allocated.",
allocationId);
}
} else {
- LOG.error("Slot request failed for slot {} with
allocation id {}: " +
- "Slot was not previously registered.",
- slotId, originalAllocationId);
+ LOG.debug("Trying to free a slot {} which has not been
registered. Ignoring this message.", slotId);
}
}
+ //
---------------------------------------------------------------------------------------------
+ // Behaviour methods
+ //
---------------------------------------------------------------------------------------------
+
/**
- * Registers a TaskExecutor
- * @param resourceID TaskExecutor's ResourceID
- * @param registration TaskExecutor's registration
- * @param slotReport TaskExecutor's free and allocated slots
+ * Finds a matching slot request for a given resource profile. If there
is no such request,
+ * the method returns null.
+ *
+ * Note: If you want to change the behaviour of the slot manager wrt
slot allocation and
+ * request fulfillment, then you should override this method.
+ *
+ * @param slotResourceProfile defining the resources of an available
slot
+ * @return A matching slot request which can be deployed in a slot with
the given resource
+ * profile. Null if there is no such slot request pending.
*/
- public void registerTaskExecutor(
- ResourceID resourceID,
- TaskExecutorRegistration registration,
- SlotReport slotReport) {
+ protected PendingSlotRequest findMatchingRequest(ResourceProfile
slotResourceProfile) {
- if (taskManagers.get(resourceID) != null) {
- notifyTaskManagerFailure(resourceID);
+ for (PendingSlotRequest pendingSlotRequest :
pendingSlotRequests.values()) {
+ if (!pendingSlotRequest.isAssigned() &&
slotResourceProfile.isMatching(pendingSlotRequest.getResourceProfile())) {
+ return pendingSlotRequest;
+ }
}
- this.taskManagers.put(resourceID, registration);
+ return null;
+ }
+
+ /**
+ * Finds a matching slot for a given resource profile. A matching slot
has at least as many
+ * resources available as the given resource profile. If there is no
such slot available, then
+ * the method returns null.
+ *
+ * Note: If you want to change the behaviour of the slot manager wrt
slot allocation and
+ * request fulfillment, then you should override this method.
+ *
+ * @param requestResourceProfile specifying the resource requirements
for the a slot request
+ * @return A matching slot which fulfills the given resource profile.
Null if there is no such
+ * slot available.
+ */
+ protected TaskManagerSlot findMatchingSlot(ResourceProfile
requestResourceProfile) {
+ Iterator<Map.Entry<SlotID, TaskManagerSlot>> iterator =
freeSlots.entrySet().iterator();
+
+ while (iterator.hasNext()) {
+ TaskManagerSlot taskManagerSlot =
iterator.next().getValue();
- for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
- final SlotID slotId = slotStatus.getSlotID();
+ // sanity check
+ Preconditions.checkState(taskManagerSlot.isFree());
- final TaskExecutorRegistration taskExecutorRegistration
= taskManagers.get(slotId.getResourceID());
- if (taskExecutorRegistration == null) {
- LOG.info("Received SlotStatus but ResourceID {}
is unknown to the SlotManager",
- slotId.getResourceID());
- return;
+ if
(taskManagerSlot.getResourceProfile().isMatching(requestResourceProfile)) {
+ iterator.remove();
+ return taskManagerSlot;
}
+ }
- final ResourceSlot slot = new ResourceSlot(slotId,
slotStatus.getProfiler(), taskExecutorRegistration);
+ return null;
+ }
- registerNewSlot(slot);
- LOG.info("New slot appeared, SlotID:{},
AllocationID:{}", slotId, slotStatus.getAllocationID());
+ //
---------------------------------------------------------------------------------------------
+ // Internal slot operations
+ //
---------------------------------------------------------------------------------------------
- if (slotStatus.getAllocationID() != null) {
- // slot in use, record this in bookkeeping
- allocationMap.addAllocation(slotId,
slotStatus.getAllocationID());
- } else {
+ /**
+ * Registers a slot for the given task manager at the slot manager. The
task manager is
+ * identified by the given instance id and the slot is identified by
the given slot id. The
+ * given resource profile defines the available resources for the slot.
The task manager
+ * connection can be used to communicate with the task manager.
+ *
+ * @param instanceId identifying the task manager on which the slot
lives
+ * @param slotId identifying the slot on the task manager
+ * @param allocationId which is currently deployed in the slot
+ * @param resourceProfile of the slot
+ * @param taskManagerConnection to communicate with the remote task
manager
+ */
+ private void registerSlot(
+ InstanceID instanceId,
+ SlotID slotId,
+ AllocationID allocationId,
+ ResourceProfile resourceProfile,
+ TaskExecutorConnection taskManagerConnection) {
+ TaskManagerRegistration taskManagerRegistration =
taskManagerRegistrations.get(instanceId);
+
+ if (null != taskManagerRegistration) {
+ TaskManagerSlot slot = new TaskManagerSlot(
+ slotId,
+ resourceProfile,
+ taskManagerConnection,
+ allocationId);
+
+ slots.put(slotId, slot);
+
+ taskManagerRegistration.addSlot(slotId);
+
+ if (slot.isFree()) {
handleFreeSlot(slot);
}
+
+ if (slot.isAllocated()) {
+
fulfilledSlotRequests.put(slot.getAllocationId(), slotId);
+ }
+ } else {
+ LOG.debug("Trying to register slot for unknown task
manager with instance id {}.", instanceId);
}
}
/**
- * Callback for TaskManager failures. In case that a TaskManager fails,
we have to clean up all its slots.
+ * Updates a slot with the given allocation id.
*
- * @param resourceId The ResourceID of the TaskManager
+ * @param slotId to update
+ * @param allocationId specifying the current allocation of the slot
*/
- public void notifyTaskManagerFailure(final ResourceID resourceId) {
- LOG.info("Resource:{} been notified failure", resourceId);
- taskManagers.remove(resourceId);
- final Map<SlotID, ResourceSlot> slotIdsToRemove =
registeredSlots.remove(resourceId);
- if (slotIdsToRemove != null) {
- for (SlotID slotId : slotIdsToRemove.keySet()) {
- LOG.info("Removing Slot: {} upon resource
failure", slotId);
- if (freeSlots.containsKey(slotId)) {
- freeSlots.remove(slotId);
- } else if (allocationMap.isAllocated(slotId)) {
- allocationMap.removeAllocation(slotId);
- } else {
- LOG.error("BUG! {} is neither in free
pool nor in allocated pool", slotId);
+ private void updateSlot(SlotID slotId, AllocationID allocationId) {
+ TaskManagerSlot slot = slots.get(slotId);
+
+ if (null != slot) {
+ // we assume the given allocation id to be the ground
truth (coming from the TM)
+ slot.setAllocationId(allocationId);
+
+ if (null != allocationId) {
+ if (slot.hasPendingSlotRequest()){
+ // we have a pending slot request -->
check whether we have to reject it
+ PendingSlotRequest pendingSlotRequest =
slot.getAssignedSlotRequest();
+
+ if
(Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) {
+ // we can cancel the slot
request because it has been fulfilled
+
cancelPendingSlotRequest(pendingSlotRequest);
+
+ // remove the pending slot
request, since it has been completed
+
pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());
+ } else {
+ // this will try to find a new
slot for the request
+ rejectPendingSlotRequest(
+ pendingSlotRequest,
+ new Exception("Task
manager reported slot " + slotId + " being already allocated."));
+ }
+
+ slot.setAssignedSlotRequest(null);
+ }
+
+ fulfilledSlotRequests.put(allocationId, slotId);
+
+ TaskManagerRegistration taskManagerRegistration
= taskManagerRegistrations.get(slot.getInstanceId());
+
+ if (null != taskManagerRegistration) {
+ // disable any registered time out for
the task manager
+ taskManagerRegistration.cancelTimeout();
}
}
+ } else {
+ LOG.debug("Trying to update unknown slot with slot id
{}.", slotId);
}
}
- //
------------------------------------------------------------------------
- // internal behaviors
- //
------------------------------------------------------------------------
-
/**
- * When we have a free slot, try to fulfill the pending request first.
If any request can be fulfilled,
- * record this allocation in bookkeeping and send slot request to
TaskManager, else we just add this slot
- * to the free pool.
+ * Tries to allocate a slot for the given slot request. If there is no
slot available, the
+ * resource manager is informed to allocate more resources and a
timeout for the request is
+ * registered.
*
- * @param freeSlot The free slot
+ * @param pendingSlotRequest to allocate a slot for
+ * @throws ResourceManagerException if the resource manager cannot
allocate more resource
*/
- private void handleFreeSlot(final ResourceSlot freeSlot) {
- SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot,
pendingSlotRequests);
+ private void internalRequestSlot(PendingSlotRequest pendingSlotRequest)
throws ResourceManagerException {
+ TaskManagerSlot taskManagerSlot =
findMatchingSlot(pendingSlotRequest.getResourceProfile());
- if (chosenRequest != null) {
- final AllocationID allocationId =
chosenRequest.getAllocationId();
- final SlotRequest slotRequest =
pendingSlotRequests.remove(allocationId);
+ if (taskManagerSlot != null) {
+ allocateSlot(taskManagerSlot, pendingSlotRequest);
+ } else {
+ final UUID timeoutIdentifier = UUID.randomUUID();
+ final AllocationID allocationId =
pendingSlotRequest.getAllocationId();
+
+ // register timeout for slot request
+ ScheduledFuture<?> timeoutFuture =
scheduledExecutor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ mainThreadExecutor.execute(new
Runnable() {
+ @Override
+ public void run() {
+
timeoutSlotRequest(allocationId, timeoutIdentifier);
+ }
+ });
+ }
+ }, slotRequestTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
- LOG.info("Assigning SlotID({}) to AllocationID({}),
JobID:{}", freeSlot.getSlotId(),
- allocationId, chosenRequest.getJobId());
- allocationMap.addAllocation(freeSlot.getSlotId(),
allocationId);
+ pendingSlotRequest.registerTimeout(timeoutFuture,
timeoutIdentifier);
- sendSlotRequest(freeSlot, slotRequest);
- } else {
- freeSlots.put(freeSlot.getSlotId(), freeSlot);
+
resourceManagerActions.allocateResource(pendingSlotRequest.getResourceProfile());
}
}
- private void sendSlotRequest(final ResourceSlot freeSlot, final
SlotRequest slotRequest) {
+ /**
+ * Allocates the given slot for the given slot request. This entails
sending a registration
+ * message to the task manager and treating failures.
+ *
+ * @param taskManagerSlot to allocate for the given slot request
+ * @param pendingSlotRequest to allocate the given slot for
+ */
+ private void allocateSlot(TaskManagerSlot taskManagerSlot,
PendingSlotRequest pendingSlotRequest) {
+ TaskExecutorConnection taskExecutorConnection =
taskManagerSlot.getTaskManagerConnection();
+ TaskExecutorGateway gateway =
taskExecutorConnection.getTaskExecutorGateway();
+
+ final CompletableFuture<Acknowledge> completableFuture = new
FlinkCompletableFuture<>();
+ final AllocationID allocationId =
pendingSlotRequest.getAllocationId();
+ final SlotID slotId = taskManagerSlot.getSlotId();
+
+ taskManagerSlot.setAssignedSlotRequest(pendingSlotRequest);
+ pendingSlotRequest.setRequestFuture(completableFuture);
+
+ // RPC call to the task manager
+ Future<Acknowledge> requestFuture = gateway.requestSlot(
+ slotId,
+ pendingSlotRequest.getJobId(),
+ allocationId,
+ pendingSlotRequest.getTargetAddress(),
+ leaderId,
+ taskManagerRequestTimeout);
+
+ requestFuture.handle(new BiFunction<Acknowledge, Throwable,
Void>() {
--- End diff --
This `BiFunction` seems to be an *identity* function. Can we just skip the
`completableFuture` and define the proper handler directly on the
`requestFuture`?
> Harden SlotManager
> ------------------
>
> Key: FLINK-5810
> URL: https://issues.apache.org/jira/browse/FLINK-5810
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination
> Affects Versions: 1.3.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
>
> Harden the {{SlotManager}} logic to better cope with lost messages.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)