[
https://issues.apache.org/jira/browse/FLINK-5810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928748#comment-15928748
]
ASF GitHub Bot commented on FLINK-5810:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/3394#discussion_r106512992
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
---
@@ -21,519 +21,897 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
-import
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
-import
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import
org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
- * SlotManager is responsible for receiving slot requests and do slot
allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation
requests in case of there are not
- * enough available slots. Besides, it should sync its slot allocation
with TaskManager's heartbeat.
- * <p>
- * The main operation principle of SlotManager is:
- * <ul>
- * <li>1. All slot allocation status should be synced with TaskManager,
which is the ground truth.</li>
- * <li>2. All slots that have registered must be tracked, either by free
pool or allocated pool.</li>
- * <li>3. All slot requests will be handled by best efforts, there is no
guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some
special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision
based on the information it currently
- * holds.</li>
- * </ul>
- * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ * The slot manager is responsible for maintaining a view on all
registered task manager slots,
+ * their allocation and all pending slot requests. Whenever a new slot is
registered or and
+ * allocated slot is freed, then it tries to fulfill another pending slot
request. Whenever there
+ * are not enough slots available the slot manager will notify the
resource manager about it via
+ * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
+ *
+ * In order to free resources and avoid resource leaks, idling task
managers (task managers whose
+ * slots are currently not used) and not fulfilled pending slot requests
time out triggering their
+ * release and failure, respectively.
*/
-public abstract class SlotManager {
+public class SlotManager implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SlotManager.class);
+
+ /** Scheduled executor for timeouts */
+ private final ScheduledExecutor scheduledExecutor;
+
+ /** Timeout for slot requests to the task manager */
+ private final Time taskManagerRequestTimeout;
+
+ /** Timeout after which an allocation is discarded */
+ private final Time slotRequestTimeout;
+
+ /** Timeout after which an unused TaskManager is released */
+ private final Time taskManagerTimeout;
+
+ /** Map for all registered slots */
+ private final HashMap<SlotID, TaskManagerSlot> slots;
- protected final Logger LOG = LoggerFactory.getLogger(getClass());
+ /** Index of all currently free slots */
+ private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
- /** The Resource allocation provider */
- protected final ResourceManagerServices rmServices;
+ /** All currently registered task managers */
+ private final HashMap<InstanceID, TaskManagerRegistration>
taskManagerRegistrations;
- /** All registered task managers with ResourceID and gateway. */
- private final Map<ResourceID, TaskExecutorRegistration> taskManagers;
+ /** Map of fulfilled and active allocations for request deduplication
purposes */
+ private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
- /** All registered slots, including free and allocated slots */
- private final Map<ResourceID, Map<SlotID, ResourceSlot>>
registeredSlots;
+ /** Map of pending/unfulfilled slot allocation requests */
+ private final HashMap<AllocationID, PendingSlotRequest>
pendingSlotRequests;
- /** All pending slot requests, waiting available slots to fulfil */
- private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+ /** Leader id of the containing component */
+ private UUID leaderId;
- /** All free slots that can be used to be allocated */
- private final Map<SlotID, ResourceSlot> freeSlots;
+ /** Executor for future callbacks which have to be "synchronized" */
+ private Executor mainThreadExecutor;
- /** All allocations, we can lookup allocations either by SlotID or
AllocationID */
- private final AllocationMap allocationMap;
+ /** Callbacks for resource (de-)allocations */
+ private ResourceManagerActions resourceManagerActions;
- private final Time timeout;
+ /** True iff the component has been started */
+ private boolean started;
- public SlotManager(ResourceManagerServices rmServices) {
- this.rmServices = checkNotNull(rmServices);
- this.registeredSlots = new HashMap<>(16);
- this.pendingSlotRequests = new LinkedHashMap<>(16);
- this.freeSlots = new HashMap<>(16);
- this.allocationMap = new AllocationMap();
- this.taskManagers = new HashMap<>();
- this.timeout = Time.seconds(10);
+ public SlotManager(
+ ScheduledExecutor scheduledExecutor,
+ Time taskManagerRequestTimeout,
+ Time slotRequestTimeout,
+ Time taskManagerTimeout) {
+ this.scheduledExecutor =
Preconditions.checkNotNull(scheduledExecutor);
+ this.taskManagerRequestTimeout =
Preconditions.checkNotNull(taskManagerRequestTimeout);
+ this.slotRequestTimeout =
Preconditions.checkNotNull(slotRequestTimeout);
+ this.taskManagerTimeout =
Preconditions.checkNotNull(taskManagerTimeout);
+
+ slots = new HashMap<>(16);
+ freeSlots = new LinkedHashMap<>(16);
+ taskManagerRegistrations = new HashMap<>(4);
+ fulfilledSlotRequests = new HashMap<>(16);
+ pendingSlotRequests = new HashMap<>(16);
+
+ leaderId = null;
+ resourceManagerActions = null;
+ started = false;
}
- //
------------------------------------------------------------------------
- // slot managements
- //
------------------------------------------------------------------------
+ //
---------------------------------------------------------------------------------------------
+ // Component lifecycle methods
+ //
---------------------------------------------------------------------------------------------
/**
- * Request a slot with requirements, we may either fulfill the request
or pending it. Trigger container
- * allocation if we don't have enough resource. If we have free slot
which can match the request, record
- * this allocation and forward the request to TaskManager through
ResourceManager (we want this done by
- * RPC's main thread to avoid race condition).
+ * Starts the slot manager with the given leader id and resource
manager actions.
*
- * @param request The detailed request of the slot
- * @return RMSlotRequestRegistered The confirmation message to be send
to the caller
+ * @param newLeaderId to use for communication with the task managers
+ * @param newResourceManagerActions to use for resource (de-)allocations
+ */
+ public void start(UUID newLeaderId, Executor newMainThreadExecutor,
ResourceManagerActions newResourceManagerActions) {
+ leaderId = Preconditions.checkNotNull(newLeaderId);
+ mainThreadExecutor =
Preconditions.checkNotNull(newMainThreadExecutor);
+ resourceManagerActions =
Preconditions.checkNotNull(newResourceManagerActions);
+
+ started = true;
+ }
+
+ /**
+ * Suspends the component. This clears the internal state of the slot
manager.
*/
- public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
- final AllocationID allocationId = request.getAllocationId();
- if (isRequestDuplicated(request)) {
- LOG.warn("Duplicated slot request, AllocationID:{}",
allocationId);
- return new RMSlotRequestRegistered(allocationId);
+ public void suspend() {
+ for (PendingSlotRequest pendingSlotRequest :
pendingSlotRequests.values()) {
+ cancelPendingSlotRequest(pendingSlotRequest);
}
- // try to fulfil the request with current free slots
- final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
- if (slot != null) {
- LOG.info("Assigning SlotID({}) to AllocationID({}),
JobID:{}", slot.getSlotId(),
- allocationId, request.getJobId());
+ pendingSlotRequests.clear();
+
+ HashSet<InstanceID> registeredTaskManagers = new
HashSet<>(taskManagerRegistrations.keySet());
+
+ for (InstanceID registeredTaskManager : registeredTaskManagers)
{
+ unregisterTaskManager(registeredTaskManager);
+ }
+
+ leaderId = null;
+ resourceManagerActions = null;
+ started = false;
+ }
+
+ /**
+ * Closes the slot manager.
+ *
+ * @throws Exception if the close operation fails
+ */
+ @Override
+ public void close() throws Exception {
+ suspend();
+ }
+
+ //
---------------------------------------------------------------------------------------------
+ // Public API
+ //
---------------------------------------------------------------------------------------------
+
+ /**
+ * Requests a slot with the respective resource profile.
+ *
+ * @param slotRequest specifying the requested slot specs
+ * @return true if the slot request was registered; false if the
request is a duplicate
+ * @throws SlotManagerException if the slot request failed (e.g. not
enough resources left)
+ */
+ public boolean registerSlotRequest(SlotRequest slotRequest) throws
SlotManagerException {
+ checkInit();
- // record this allocation in bookkeeping
- allocationMap.addAllocation(slot.getSlotId(),
allocationId);
- // remove selected slot from free pool
- freeSlots.remove(slot.getSlotId());
+ if (checkDuplicateRequest(slotRequest.getAllocationId())) {
+ LOG.debug("Ignoring a duplicate slot request with
allocation id {}.", slotRequest.getAllocationId());
- sendSlotRequest(slot, request);
+ return false;
} else {
- LOG.info("Cannot fulfil slot request, try to allocate a
new container for it, " +
- "AllocationID:{}, JobID:{}", allocationId,
request.getJobId());
- Preconditions.checkState(rmServices != null,
- "Attempted to allocate resources but no
ResourceManagerServices set.");
-
rmServices.allocateResource(request.getResourceProfile());
- pendingSlotRequests.put(allocationId, request);
+ PendingSlotRequest pendingSlotRequest = new
PendingSlotRequest(slotRequest);
+
+ pendingSlotRequests.put(slotRequest.getAllocationId(),
pendingSlotRequest);
+
+ try {
+ internalRequestSlot(pendingSlotRequest);
+ } catch (ResourceManagerException e) {
+ // requesting the slot failed --> remove
pending slot request
+
pendingSlotRequests.remove(slotRequest.getAllocationId());
+
+ throw new SlotManagerException("Could not
fulfill slot request " + slotRequest.getAllocationId() + '.', e);
+ }
+
+ return true;
}
+ }
- return new RMSlotRequestRegistered(allocationId);
+ /**
+ * Cancels and removes a pending slot request with the given allocation
id. If there is no such
+ * pending request, then nothing is done.
+ *
+ * @param allocationId identifying the pending slot request
+ * @return True if a pending slot request was found; otherwise false
+ */
+ public boolean unregisterSlotRequest(AllocationID allocationId) {
+ checkInit();
+
+ PendingSlotRequest pendingSlotRequest =
pendingSlotRequests.remove(allocationId);
+
+ if (null != pendingSlotRequest) {
+ cancelPendingSlotRequest(pendingSlotRequest);
+
+ return true;
+ } else {
+ LOG.debug("No pending slot request with allocation id
{} found.", allocationId);
+
+ return false;
+ }
}
/**
- * Notifies the SlotManager that a slot is available again after being
allocated.
- * @param slotID slot id of available slot
+ * Registers a new task manager at the slot manager. This will make the
task managers slots
+ * known and, thus, available for allocation.
+ *
+ * @param taskExecutorConnection for the new task manager
+ * @param initialSlotReport for the new task manager
*/
- public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
- if (!allocationMap.isAllocated(slotID)) {
- throw new IllegalStateException("Slot was not
previously allocated but " +
- "TaskManager reports it as available again");
+ public void registerTaskManager(final TaskExecutorConnection
taskExecutorConnection, SlotReport initialSlotReport) {
+ checkInit();
+
+ // we identify task managers by their instance id
+ if
(!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID()))
{
+ TaskManagerRegistration taskManagerRegistration = new
TaskManagerRegistration(taskExecutorConnection);
+
taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(),
taskManagerRegistration);
}
- allocationMap.removeAllocation(slotID);
- final Map<SlotID, ResourceSlot> slots =
registeredSlots.get(resourceID);
- ResourceSlot freeSlot = slots.get(slotID);
- if (freeSlot == null) {
- throw new IllegalStateException("Slot was not
registered with SlotManager but " +
- "TaskManager reported it to be available.");
+
+ reportSlotStatus(taskExecutorConnection.getInstanceID(),
initialSlotReport);
+ }
+
+ /**
+ * Unregisters the task manager identified by the given instance id and
its associated slots
+ * from the slot manager.
+ *
+ * @param instanceId identifying the task manager to unregister
+ * @return True if there existed a registered task manager with the
given instance id
+ */
+ public boolean unregisterTaskManager(InstanceID instanceId) {
+ checkInit();
+
+ TaskManagerRegistration taskManagerRegistration =
taskManagerRegistrations.remove(instanceId);
+
+ if (null != taskManagerRegistration) {
+ removeSlots(taskManagerRegistration.getSlots());
+
+ taskManagerRegistration.cancelTimeout();
+
+ return true;
+ } else {
+ LOG.debug("There is no task manager registered with
instance ID {}. Ignoring this message.", instanceId);
+
+ return false;
}
- handleFreeSlot(freeSlot);
}
/**
- * The slot request to TaskManager may be either failed by rpc
communication (timeout, network error, etc.)
- * or really rejected by TaskManager. We shall retry this request by:
- * <ul>
- * <li>1. verify and clear all the previous allocate information for
this request
- * <li>2. try to request slot again
- * </ul>
- * <p>
- * This may cause some duplicate allocation, e.g. the slot request to
TaskManager is successful but the response
- * is lost somehow, so we may request a slot in another TaskManager,
this causes two slots assigned to one request,
- * but it can be taken care of by rejecting registration at JobManager.
+ * Reports the current slot allocations for a task manager identified
by the given instance id.
*
- * @param originalRequest The original slot request
- * @param slotId The target SlotID
+ * @param instanceId identifying the task manager for which to report
the slot status
+ * @param slotReport containing the status for all of its slots
+ * @return true if the slot status has been updated successfully,
otherwise false
*/
- void handleSlotRequestFailedAtTaskManager(final SlotRequest
originalRequest, final SlotID slotId) {
- final AllocationID originalAllocationId =
originalRequest.getAllocationId();
- LOG.info("Slot request failed at TaskManager, SlotID:{},
AllocationID:{}, JobID:{}",
- slotId, originalAllocationId,
originalRequest.getJobId());
-
- if (allocationMap.isAllocated(slotId)) {
- final AllocationID expectedAllocationId =
allocationMap.getAllocationID(slotId);
-
- // check whether we have an agreement on whom this slot
belongs to
- if (originalAllocationId.equals(expectedAllocationId)) {
- LOG.info("De-allocate this request and retry");
-
allocationMap.removeAllocation(expectedAllocationId);
-
pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest);
- ResourceSlot slot =
checkNotNull(getRegisteredSlot(slotId));
- // treat this slot as empty and retry with a
different request
- handleFreeSlot(slot);
+ public boolean reportSlotStatus(InstanceID instanceId, SlotReport
slotReport) {
+ checkInit();
+
+ TaskManagerRegistration taskManagerRegistration =
taskManagerRegistrations.get(instanceId);
+
+ if (null != taskManagerRegistration) {
+ HashSet<SlotID> slotsToRemove = new
HashSet<>(taskManagerRegistration.getSlots());
+ boolean idle = true;
+
+ for (SlotStatus slotStatus : slotReport) {
+ if
(slotsToRemove.remove(slotStatus.getSlotID())) {
+ // slot which was already registered
+ updateSlot(slotStatus.getSlotID(),
slotStatus.getAllocationID());
+ } else {
+ // new slot
+ registerSlot(
+ instanceId,
+ slotStatus.getSlotID(),
+ slotStatus.getAllocationID(),
+ slotStatus.getResourceProfile(),
+
taskManagerRegistration.getTaskManagerConnection());
+ }
+
+ TaskManagerSlot slot =
slots.get(slotStatus.getSlotID());
+
+ idle &= slot.isFree();
+ }
+
+ // remove the slots for which we haven't received a
slot status message
+ removeSlots(slotsToRemove);
+
+ if (idle) {
--- End diff --
I assume this breaks the TaskManager timeouts. Every time a heartbeat comes
(with a slot report), the TaskManager is detected to be idle, and the timeout
is scheduled, overriding the previous timeout (pushing the timeout further
back).
> Harden SlotManager
> ------------------
>
> Key: FLINK-5810
> URL: https://issues.apache.org/jira/browse/FLINK-5810
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination
> Affects Versions: 1.3.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
>
> Harden the {{SlotManager}} logic to better cope with lost messages.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)