[
https://issues.apache.org/jira/browse/FLINK-5810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928750#comment-15928750
]
ASF GitHub Bot commented on FLINK-5810:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/3394#discussion_r106507099
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
---
@@ -21,519 +21,897 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
-import
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
-import
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import
org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
- * SlotManager is responsible for receiving slot requests and do slot
allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation
requests in case of there are not
- * enough available slots. Besides, it should sync its slot allocation
with TaskManager's heartbeat.
- * <p>
- * The main operation principle of SlotManager is:
- * <ul>
- * <li>1. All slot allocation status should be synced with TaskManager,
which is the ground truth.</li>
- * <li>2. All slots that have registered must be tracked, either by free
pool or allocated pool.</li>
- * <li>3. All slot requests will be handled by best efforts, there is no
guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some
special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision
based on the information it currently
- * holds.</li>
- * </ul>
- * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ * The slot manager is responsible for maintaining a view on all
registered task manager slots,
+ * their allocation and all pending slot requests. Whenever a new slot is
registered or and
+ * allocated slot is freed, then it tries to fulfill another pending slot
request. Whenever there
+ * are not enough slots available the slot manager will notify the
resource manager about it via
+ * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
+ *
+ * In order to free resources and avoid resource leaks, idling task
managers (task managers whose
+ * slots are currently not used) and not fulfilled pending slot requests
time out triggering their
+ * release and failure, respectively.
*/
-public abstract class SlotManager {
+public class SlotManager implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SlotManager.class);
+
+ /** Scheduled executor for timeouts */
+ private final ScheduledExecutor scheduledExecutor;
+
+ /** Timeout for slot requests to the task manager */
+ private final Time taskManagerRequestTimeout;
+
+ /** Timeout after which an allocation is discarded */
+ private final Time slotRequestTimeout;
+
+ /** Timeout after which an unused TaskManager is released */
+ private final Time taskManagerTimeout;
+
+ /** Map for all registered slots */
+ private final HashMap<SlotID, TaskManagerSlot> slots;
- protected final Logger LOG = LoggerFactory.getLogger(getClass());
+ /** Index of all currently free slots */
+ private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
- /** The Resource allocation provider */
- protected final ResourceManagerServices rmServices;
+ /** All currently registered task managers */
+ private final HashMap<InstanceID, TaskManagerRegistration>
taskManagerRegistrations;
- /** All registered task managers with ResourceID and gateway. */
- private final Map<ResourceID, TaskExecutorRegistration> taskManagers;
+ /** Map of fulfilled and active allocations for request deduplication
purposes */
+ private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
- /** All registered slots, including free and allocated slots */
- private final Map<ResourceID, Map<SlotID, ResourceSlot>>
registeredSlots;
+ /** Map of pending/unfulfilled slot allocation requests */
+ private final HashMap<AllocationID, PendingSlotRequest>
pendingSlotRequests;
- /** All pending slot requests, waiting available slots to fulfil */
- private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+ /** Leader id of the containing component */
+ private UUID leaderId;
- /** All free slots that can be used to be allocated */
- private final Map<SlotID, ResourceSlot> freeSlots;
+ /** Executor for future callbacks which have to be "synchronized" */
+ private Executor mainThreadExecutor;
- /** All allocations, we can lookup allocations either by SlotID or
AllocationID */
- private final AllocationMap allocationMap;
+ /** Callbacks for resource (de-)allocations */
+ private ResourceManagerActions resourceManagerActions;
- private final Time timeout;
+ /** True iff the component has been started */
+ private boolean started;
- public SlotManager(ResourceManagerServices rmServices) {
- this.rmServices = checkNotNull(rmServices);
- this.registeredSlots = new HashMap<>(16);
- this.pendingSlotRequests = new LinkedHashMap<>(16);
- this.freeSlots = new HashMap<>(16);
- this.allocationMap = new AllocationMap();
- this.taskManagers = new HashMap<>();
- this.timeout = Time.seconds(10);
+ public SlotManager(
+ ScheduledExecutor scheduledExecutor,
+ Time taskManagerRequestTimeout,
+ Time slotRequestTimeout,
+ Time taskManagerTimeout) {
+ this.scheduledExecutor =
Preconditions.checkNotNull(scheduledExecutor);
+ this.taskManagerRequestTimeout =
Preconditions.checkNotNull(taskManagerRequestTimeout);
+ this.slotRequestTimeout =
Preconditions.checkNotNull(slotRequestTimeout);
+ this.taskManagerTimeout =
Preconditions.checkNotNull(taskManagerTimeout);
+
+ slots = new HashMap<>(16);
+ freeSlots = new LinkedHashMap<>(16);
+ taskManagerRegistrations = new HashMap<>(4);
+ fulfilledSlotRequests = new HashMap<>(16);
+ pendingSlotRequests = new HashMap<>(16);
+
+ leaderId = null;
+ resourceManagerActions = null;
+ started = false;
}
- //
------------------------------------------------------------------------
- // slot managements
- //
------------------------------------------------------------------------
+ //
---------------------------------------------------------------------------------------------
+ // Component lifecycle methods
+ //
---------------------------------------------------------------------------------------------
/**
- * Request a slot with requirements, we may either fulfill the request
or pending it. Trigger container
- * allocation if we don't have enough resource. If we have free slot
which can match the request, record
- * this allocation and forward the request to TaskManager through
ResourceManager (we want this done by
- * RPC's main thread to avoid race condition).
+ * Starts the slot manager with the given leader id and resource
manager actions.
*
- * @param request The detailed request of the slot
- * @return RMSlotRequestRegistered The confirmation message to be send
to the caller
+ * @param newLeaderId to use for communication with the task managers
+ * @param newResourceManagerActions to use for resource (de-)allocations
+ */
+ public void start(UUID newLeaderId, Executor newMainThreadExecutor,
ResourceManagerActions newResourceManagerActions) {
+ leaderId = Preconditions.checkNotNull(newLeaderId);
+ mainThreadExecutor =
Preconditions.checkNotNull(newMainThreadExecutor);
+ resourceManagerActions =
Preconditions.checkNotNull(newResourceManagerActions);
+
+ started = true;
+ }
+
+ /**
+ * Suspends the component. This clears the internal state of the slot
manager.
*/
- public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
- final AllocationID allocationId = request.getAllocationId();
- if (isRequestDuplicated(request)) {
- LOG.warn("Duplicated slot request, AllocationID:{}",
allocationId);
- return new RMSlotRequestRegistered(allocationId);
+ public void suspend() {
+ for (PendingSlotRequest pendingSlotRequest :
pendingSlotRequests.values()) {
+ cancelPendingSlotRequest(pendingSlotRequest);
}
- // try to fulfil the request with current free slots
- final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
- if (slot != null) {
- LOG.info("Assigning SlotID({}) to AllocationID({}),
JobID:{}", slot.getSlotId(),
- allocationId, request.getJobId());
+ pendingSlotRequests.clear();
+
+ HashSet<InstanceID> registeredTaskManagers = new
HashSet<>(taskManagerRegistrations.keySet());
--- End diff --
If the copy of the collection is just for concurrent iteration, I would
make it an `ArrayList`. Builds faster and iterates faster.
> Harden SlotManager
> ------------------
>
> Key: FLINK-5810
> URL: https://issues.apache.org/jira/browse/FLINK-5810
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination
> Affects Versions: 1.3.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
>
> Harden the {{SlotManager}} logic to better cope with lost messages.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)