This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 052599d9a5f6c96eb2ba546d5b66453c8d09ed2f Author: Xintong Song <tonysong...@gmail.com> AuthorDate: Tue Jul 30 11:25:16 2019 +0200 [hotfix][runtime] Extract SlotManager interface Rename SlotManager -> SlotManagerImpl and let it implement the SlotManager interface --- .../ResourceManagerRuntimeServices.java | 3 +- .../resourcemanager/slotmanager/SlotManager.java | 1105 +------------------- .../{SlotManager.java => SlotManagerImpl.java} | 40 +- .../slotmanager/SlotManagerBuilder.java | 6 +- .../slotmanager/SlotManagerTest.java | 46 +- .../TaskManagerReleaseInSlotManagerTest.java | 12 +- 6 files changed, 77 insertions(+), 1135 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java index 0c0ba89..f2cf015 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl; import org.apache.flink.util.Preconditions; /** @@ -54,7 +55,7 @@ public class ResourceManagerRuntimeServices { final SlotManagerConfiguration slotManagerConfiguration = configuration.getSlotManagerConfiguration(); - final SlotManager slotManager = new SlotManager( + final SlotManager slotManager = new SlotManagerImpl( scheduledExecutor, slotManagerConfiguration.getTaskManagerRequestTimeout(), slotManagerConfiguration.getSlotRequestTimeout(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java old mode 100755 new mode 100644 index 71f3df6..5b709d3 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -19,47 +19,16 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot; -import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; import org.apache.flink.runtime.taskexecutor.SlotReport; -import org.apache.flink.runtime.taskexecutor.SlotStatus; -import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; -import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; -import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * The slot manager is responsible for maintaining a view on all registered task manager slots, @@ -72,142 +41,20 @@ import java.util.concurrent.TimeoutException; * slots are currently not used) and pending slot requests time out triggering their release and * failure, respectively. */ -public class SlotManager implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); - - /** Scheduled executor for timeouts. */ - private final ScheduledExecutor scheduledExecutor; - - /** Timeout for slot requests to the task manager. */ - private final Time taskManagerRequestTimeout; - - /** Timeout after which an allocation is discarded. */ - private final Time slotRequestTimeout; - - /** Timeout after which an unused TaskManager is released. */ - private final Time taskManagerTimeout; - - /** Map for all registered slots. */ - private final HashMap<SlotID, TaskManagerSlot> slots; - - /** Index of all currently free slots. */ - private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots; +public interface SlotManager extends AutoCloseable { + int getNumberRegisteredSlots(); - /** All currently registered task managers. */ - private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations; + int getNumberRegisteredSlotsOf(InstanceID instanceId); - /** Map of fulfilled and active allocations for request deduplication purposes. */ - private final HashMap<AllocationID, SlotID> fulfilledSlotRequests; + int getNumberFreeSlots(); - /** Map of pending/unfulfilled slot allocation requests. */ - private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests; + int getNumberFreeSlotsOf(InstanceID instanceId); - private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots; + int getNumberPendingTaskManagerSlots(); - /** ResourceManager's id. */ - private ResourceManagerId resourceManagerId; + int getNumberPendingSlotRequests(); - /** Executor for future callbacks which have to be "synchronized". */ - private Executor mainThreadExecutor; - - /** Callbacks for resource (de-)allocations. */ - private ResourceActions resourceActions; - - private ScheduledFuture<?> taskManagerTimeoutCheck; - - private ScheduledFuture<?> slotRequestTimeoutCheck; - - /** True iff the component has been started. */ - private boolean started; - - /** Release task executor only when each produced result partition is either consumed or failed. */ - private final boolean waitResultConsumedBeforeRelease; - - /** - * If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request to pend. - * - * A slot request is considered unfulfillable if it cannot be fulfilled by neither a slot that is already registered - * (including allocated ones) nor a pending slot that the {@link ResourceActions} can allocate. - * */ - private boolean failUnfulfillableRequest = false; - - public SlotManager( - ScheduledExecutor scheduledExecutor, - Time taskManagerRequestTimeout, - Time slotRequestTimeout, - Time taskManagerTimeout, - boolean waitResultConsumedBeforeRelease) { - - this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor); - this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); - this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); - this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); - this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease; - - slots = new HashMap<>(16); - freeSlots = new LinkedHashMap<>(16); - taskManagerRegistrations = new HashMap<>(4); - fulfilledSlotRequests = new HashMap<>(16); - pendingSlotRequests = new HashMap<>(16); - pendingSlots = new HashMap<>(16); - - resourceManagerId = null; - resourceActions = null; - mainThreadExecutor = null; - taskManagerTimeoutCheck = null; - slotRequestTimeoutCheck = null; - - started = false; - } - - public int getNumberRegisteredSlots() { - return slots.size(); - } - - public int getNumberRegisteredSlotsOf(InstanceID instanceId) { - TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); - - if (taskManagerRegistration != null) { - return taskManagerRegistration.getNumberRegisteredSlots(); - } else { - return 0; - } - } - - public int getNumberFreeSlots() { - return freeSlots.size(); - } - - public int getNumberFreeSlotsOf(InstanceID instanceId) { - TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); - - if (taskManagerRegistration != null) { - return taskManagerRegistration.getNumberFreeSlots(); - } else { - return 0; - } - } - - public int getNumberPendingTaskManagerSlots() { - return pendingSlots.size(); - } - - public int getNumberPendingSlotRequests() { - return pendingSlotRequests.size(); - } - - public boolean isFailingUnfulfillableRequest() { - return failUnfulfillableRequest; - } - - @VisibleForTesting - int getNumberAssignedPendingTaskManagerSlots() { - return (int) pendingSlots.values().stream().filter(slot -> slot.getAssignedPendingSlotRequest() != null).count(); - } - - // --------------------------------------------------------------------------------------------- - // Component lifecycle methods - // --------------------------------------------------------------------------------------------- + boolean isFailingUnfulfillableRequest(); /** * Starts the slot manager with the given leader id and resource manager actions. @@ -216,79 +63,12 @@ public class SlotManager implements AutoCloseable { * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread * @param newResourceActions to use for resource (de-)allocations */ - public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) { - LOG.info("Starting the SlotManager."); - - this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId); - mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); - resourceActions = Preconditions.checkNotNull(newResourceActions); - - started = true; - - taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay( - () -> mainThreadExecutor.execute( - () -> checkTaskManagerTimeouts()), - 0L, - taskManagerTimeout.toMilliseconds(), - TimeUnit.MILLISECONDS); - - slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay( - () -> mainThreadExecutor.execute( - () -> checkSlotRequestTimeouts()), - 0L, - slotRequestTimeout.toMilliseconds(), - TimeUnit.MILLISECONDS); - } + void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions); /** * Suspends the component. This clears the internal state of the slot manager. */ - public void suspend() { - LOG.info("Suspending the SlotManager."); - - // stop the timeout checks for the TaskManagers and the SlotRequests - if (taskManagerTimeoutCheck != null) { - taskManagerTimeoutCheck.cancel(false); - taskManagerTimeoutCheck = null; - } - - if (slotRequestTimeoutCheck != null) { - slotRequestTimeoutCheck.cancel(false); - slotRequestTimeoutCheck = null; - } - - for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) { - cancelPendingSlotRequest(pendingSlotRequest); - } - - pendingSlotRequests.clear(); - - ArrayList<InstanceID> registeredTaskManagers = new ArrayList<>(taskManagerRegistrations.keySet()); - - for (InstanceID registeredTaskManager : registeredTaskManagers) { - unregisterTaskManager(registeredTaskManager); - } - - resourceManagerId = null; - resourceActions = null; - started = false; - } - - /** - * Closes the slot manager. - * - * @throws Exception if the close operation fails - */ - @Override - public void close() throws Exception { - LOG.info("Closing the SlotManager."); - - suspend(); - } - - // --------------------------------------------------------------------------------------------- - // Public API - // --------------------------------------------------------------------------------------------- + void suspend(); /** * Requests a slot with the respective resource profile. @@ -297,30 +77,7 @@ public class SlotManager implements AutoCloseable { * @return true if the slot request was registered; false if the request is a duplicate * @throws SlotManagerException if the slot request failed (e.g. not enough resources left) */ - public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException { - checkInit(); - - if (checkDuplicateRequest(slotRequest.getAllocationId())) { - LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId()); - - return false; - } else { - PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); - - pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest); - - try { - internalRequestSlot(pendingSlotRequest); - } catch (ResourceManagerException e) { - // requesting the slot failed --> remove pending slot request - pendingSlotRequests.remove(slotRequest.getAllocationId()); - - throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e); - } - - return true; - } - } + boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException; /** * Cancels and removes a pending slot request with the given allocation id. If there is no such @@ -329,23 +86,7 @@ public class SlotManager implements AutoCloseable { * @param allocationId identifying the pending slot request * @return True if a pending slot request was found; otherwise false */ - public boolean unregisterSlotRequest(AllocationID allocationId) { - checkInit(); - - PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId); - - if (null != pendingSlotRequest) { - LOG.debug("Cancel slot request {}.", allocationId); - - cancelPendingSlotRequest(pendingSlotRequest); - - return true; - } else { - LOG.debug("No pending slot request with allocation id {} found. Ignoring unregistration request.", allocationId); - - return false; - } - } + boolean unregisterSlotRequest(AllocationID allocationId); /** * Registers a new task manager at the slot manager. This will make the task managers slots @@ -354,40 +95,7 @@ public class SlotManager implements AutoCloseable { * @param taskExecutorConnection for the new task manager * @param initialSlotReport for the new task manager */ - public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) { - checkInit(); - - LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID()); - - // we identify task managers by their instance id - if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) { - reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport); - } else { - // first register the TaskManager - ArrayList<SlotID> reportedSlots = new ArrayList<>(); - - for (SlotStatus slotStatus : initialSlotReport) { - reportedSlots.add(slotStatus.getSlotID()); - } - - TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration( - taskExecutorConnection, - reportedSlots); - - taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration); - - // next register the new slots - for (SlotStatus slotStatus : initialSlotReport) { - registerSlot( - slotStatus.getSlotID(), - slotStatus.getAllocationID(), - slotStatus.getJobID(), - slotStatus.getResourceProfile(), - taskExecutorConnection); - } - } - - } + void registerTaskManager(TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport); /** * Unregisters the task manager identified by the given instance id and its associated slots @@ -396,23 +104,7 @@ public class SlotManager implements AutoCloseable { * @param instanceId identifying the task manager to unregister * @return True if there existed a registered task manager with the given instance id */ - public boolean unregisterTaskManager(InstanceID instanceId) { - checkInit(); - - LOG.debug("Unregister TaskManager {} from the SlotManager.", instanceId); - - TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId); - - if (null != taskManagerRegistration) { - internalUnregisterTaskManager(taskManagerRegistration); - - return true; - } else { - LOG.debug("There is no task manager registered with instance ID {}. Ignoring this message.", instanceId); - - return false; - } - } + boolean unregisterTaskManager(InstanceID instanceId); /** * Reports the current slot allocations for a task manager identified by the given instance id. @@ -421,26 +113,7 @@ public class SlotManager implements AutoCloseable { * @param slotReport containing the status for all of its slots * @return true if the slot status has been updated successfully, otherwise false */ - public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) { - checkInit(); - - LOG.debug("Received slot report from instance {}: {}.", instanceId, slotReport); - - TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); - - if (null != taskManagerRegistration) { - - for (SlotStatus slotStatus : slotReport) { - updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID(), slotStatus.getJobID()); - } - - return true; - } else { - LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring this report.", instanceId); - - return false; - } - } + boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport); /** * Free the given slot from the given allocation. If the slot is still allocated by the given @@ -449,752 +122,10 @@ public class SlotManager implements AutoCloseable { * @param slotId identifying the slot to free * @param allocationId with which the slot is presumably allocated */ - public void freeSlot(SlotID slotId, AllocationID allocationId) { - checkInit(); - - TaskManagerSlot slot = slots.get(slotId); - - if (null != slot) { - if (slot.getState() == TaskManagerSlot.State.ALLOCATED) { - if (Objects.equals(allocationId, slot.getAllocationId())) { - - TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); - - if (taskManagerRegistration == null) { - throw new IllegalStateException("Trying to free a slot from a TaskManager " + - slot.getInstanceId() + " which has not been registered."); - } - - updateSlotState(slot, taskManagerRegistration, null, null); - } else { - LOG.debug("Received request to free slot {} with expected allocation id {}, " + - "but actual allocation id {} differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId()); - } - } else { - LOG.debug("Slot {} has not been allocated.", allocationId); - } - } else { - LOG.debug("Trying to free a slot {} which has not been registered. Ignoring this message.", slotId); - } - } - - public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) { - if (!this.failUnfulfillableRequest && failUnfulfillableRequest) { - // fail unfulfillable pending requests - Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator(); - while (slotRequestIterator.hasNext()) { - PendingSlotRequest pendingSlotRequest = slotRequestIterator.next().getValue(); - if (pendingSlotRequest.getAssignedPendingTaskManagerSlot() != null) { - continue; - } - if (!isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile())) { - slotRequestIterator.remove(); - resourceActions.notifyAllocationFailure( - pendingSlotRequest.getJobId(), - pendingSlotRequest.getAllocationId(), - new ResourceManagerException("Could not fulfill slot request " + pendingSlotRequest.getAllocationId() + ". " - + "Requested resource profile (" + pendingSlotRequest.getResourceProfile() + ") is unfulfillable.") - ); - } - } - } - this.failUnfulfillableRequest = failUnfulfillableRequest; - } - - // --------------------------------------------------------------------------------------------- - // Behaviour methods - // --------------------------------------------------------------------------------------------- - - /** - * Finds a matching slot request for a given resource profile. If there is no such request, - * the method returns null. - * - * <p>Note: If you want to change the behaviour of the slot manager wrt slot allocation and - * request fulfillment, then you should override this method. - * - * @param slotResourceProfile defining the resources of an available slot - * @return A matching slot request which can be deployed in a slot with the given resource - * profile. Null if there is no such slot request pending. - */ - protected PendingSlotRequest findMatchingRequest(ResourceProfile slotResourceProfile) { - - for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) { - if (!pendingSlotRequest.isAssigned() && slotResourceProfile.isMatching(pendingSlotRequest.getResourceProfile())) { - return pendingSlotRequest; - } - } - - return null; - } - - /** - * Finds a matching slot for a given resource profile. A matching slot has at least as many - * resources available as the given resource profile. If there is no such slot available, then - * the method returns null. - * - * <p>Note: If you want to change the behaviour of the slot manager wrt slot allocation and - * request fulfillment, then you should override this method. - * - * @param requestResourceProfile specifying the resource requirements for the a slot request - * @return A matching slot which fulfills the given resource profile. Null if there is no such - * slot available. - */ - protected TaskManagerSlot findMatchingSlot(ResourceProfile requestResourceProfile) { - Iterator<Map.Entry<SlotID, TaskManagerSlot>> iterator = freeSlots.entrySet().iterator(); - - while (iterator.hasNext()) { - TaskManagerSlot taskManagerSlot = iterator.next().getValue(); - - // sanity check - Preconditions.checkState( - taskManagerSlot.getState() == TaskManagerSlot.State.FREE, - "TaskManagerSlot %s is not in state FREE but %s.", - taskManagerSlot.getSlotId(), taskManagerSlot.getState()); - - if (taskManagerSlot.getResourceProfile().isMatching(requestResourceProfile)) { - iterator.remove(); - return taskManagerSlot; - } - } - - return null; - } - - // --------------------------------------------------------------------------------------------- - // Internal slot operations - // --------------------------------------------------------------------------------------------- - - /** - * Registers a slot for the given task manager at the slot manager. The slot is identified by - * the given slot id. The given resource profile defines the available resources for the slot. - * The task manager connection can be used to communicate with the task manager. - * - * @param slotId identifying the slot on the task manager - * @param allocationId which is currently deployed in the slot - * @param resourceProfile of the slot - * @param taskManagerConnection to communicate with the remote task manager - */ - private void registerSlot( - SlotID slotId, - AllocationID allocationId, - JobID jobId, - ResourceProfile resourceProfile, - TaskExecutorConnection taskManagerConnection) { - - if (slots.containsKey(slotId)) { - // remove the old slot first - removeSlot(slotId); - } - - final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection); - - final PendingTaskManagerSlot pendingTaskManagerSlot; - - if (allocationId == null) { - pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile); - } else { - pendingTaskManagerSlot = null; - } - - if (pendingTaskManagerSlot == null) { - updateSlot(slotId, allocationId, jobId); - } else { - pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId()); - final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest(); - - if (assignedPendingSlotRequest == null) { - handleFreeSlot(slot); - } else { - assignedPendingSlotRequest.unassignPendingTaskManagerSlot(); - allocateSlot(slot, assignedPendingSlotRequest); - } - } - } - - @Nonnull - private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) { - final TaskManagerSlot slot = new TaskManagerSlot( - slotId, - resourceProfile, - taskManagerConnection); - slots.put(slotId, slot); - return slot; - } - - @Nullable - private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) { - for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { - if (pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) { - return pendingTaskManagerSlot; - } - } - - return null; - } - - /** - * Updates a slot with the given allocation id. - * - * @param slotId to update - * @param allocationId specifying the current allocation of the slot - * @param jobId specifying the job to which the slot is allocated - * @return True if the slot could be updated; otherwise false - */ - private boolean updateSlot(SlotID slotId, AllocationID allocationId, JobID jobId) { - final TaskManagerSlot slot = slots.get(slotId); - - if (slot != null) { - final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); - - if (taskManagerRegistration != null) { - updateSlotState(slot, taskManagerRegistration, allocationId, jobId); + void freeSlot(SlotID slotId, AllocationID allocationId); - return true; - } else { - throw new IllegalStateException("Trying to update a slot from a TaskManager " + - slot.getInstanceId() + " which has not been registered."); - } - } else { - LOG.debug("Trying to update unknown slot with slot id {}.", slotId); - - return false; - } - } - - private void updateSlotState( - TaskManagerSlot slot, - TaskManagerRegistration taskManagerRegistration, - @Nullable AllocationID allocationId, - @Nullable JobID jobId) { - if (null != allocationId) { - switch (slot.getState()) { - case PENDING: - // we have a pending slot request --> check whether we have to reject it - PendingSlotRequest pendingSlotRequest = slot.getAssignedSlotRequest(); - - if (Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) { - // we can cancel the slot request because it has been fulfilled - cancelPendingSlotRequest(pendingSlotRequest); - - // remove the pending slot request, since it has been completed - pendingSlotRequests.remove(pendingSlotRequest.getAllocationId()); - - slot.completeAllocation(allocationId, jobId); - } else { - // we first have to free the slot in order to set a new allocationId - slot.clearPendingSlotRequest(); - // set the allocation id such that the slot won't be considered for the pending slot request - slot.updateAllocation(allocationId, jobId); - - // remove the pending request if any as it has been assigned - final PendingSlotRequest actualPendingSlotRequest = pendingSlotRequests.remove(allocationId); - - if (actualPendingSlotRequest != null) { - cancelPendingSlotRequest(actualPendingSlotRequest); - } - - // this will try to find a new slot for the request - rejectPendingSlotRequest( - pendingSlotRequest, - new Exception("Task manager reported slot " + slot.getSlotId() + " being already allocated.")); - } - - taskManagerRegistration.occupySlot(); - break; - case ALLOCATED: - if (!Objects.equals(allocationId, slot.getAllocationId())) { - slot.freeSlot(); - slot.updateAllocation(allocationId, jobId); - } - break; - case FREE: - // the slot is currently free --> it is stored in freeSlots - freeSlots.remove(slot.getSlotId()); - slot.updateAllocation(allocationId, jobId); - taskManagerRegistration.occupySlot(); - break; - } - - fulfilledSlotRequests.put(allocationId, slot.getSlotId()); - } else { - // no allocation reported - switch (slot.getState()) { - case FREE: - handleFreeSlot(slot); - break; - case PENDING: - // don't do anything because we still have a pending slot request - break; - case ALLOCATED: - AllocationID oldAllocation = slot.getAllocationId(); - slot.freeSlot(); - fulfilledSlotRequests.remove(oldAllocation); - taskManagerRegistration.freeSlot(); - - handleFreeSlot(slot); - break; - } - } - } - - /** - * Tries to allocate a slot for the given slot request. If there is no slot available, the - * resource manager is informed to allocate more resources and a timeout for the request is - * registered. - * - * @param pendingSlotRequest to allocate a slot for - * @throws ResourceManagerException if the resource manager cannot allocate more resource - */ - private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException { - final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile(); - TaskManagerSlot taskManagerSlot = findMatchingSlot(resourceProfile); - - if (taskManagerSlot != null) { - allocateSlot(taskManagerSlot, pendingSlotRequest); - } else { - Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile); - - if (!pendingTaskManagerSlotOptional.isPresent()) { - pendingTaskManagerSlotOptional = allocateResource(resourceProfile); - } - - if (pendingTaskManagerSlotOptional.isPresent()) { - assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlotOptional.get()); - } - else { - // request can not be fulfilled by any free slot or pending slot that can be allocated, - // check whether it can be fulfilled by allocated slots - if (failUnfulfillableRequest && !isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile())) { - throw new ResourceManagerException("Requested resource profile (" + - pendingSlotRequest.getResourceProfile() + ") is unfulfillable."); - } - } - } - } - - private Optional<PendingTaskManagerSlot> findFreeMatchingPendingTaskManagerSlot(ResourceProfile requiredResourceProfile) { - for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { - if (pendingTaskManagerSlot.getAssignedPendingSlotRequest() == null && pendingTaskManagerSlot.getResourceProfile().isMatching(requiredResourceProfile)) { - return Optional.of(pendingTaskManagerSlot); - } - } - - return Optional.empty(); - } - - private boolean isFulfillableByRegisteredSlots(ResourceProfile resourceProfile) { - for (TaskManagerSlot slot : slots.values()) { - if (slot.getResourceProfile().isMatching(resourceProfile)) { - return true; - } - } - return false; - } - - private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException { - final Collection<ResourceProfile> requestedSlots = resourceActions.allocateResource(resourceProfile); - - if (requestedSlots.isEmpty()) { - return Optional.empty(); - } else { - final Iterator<ResourceProfile> slotIterator = requestedSlots.iterator(); - final PendingTaskManagerSlot pendingTaskManagerSlot = new PendingTaskManagerSlot(slotIterator.next()); - pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot); - - while (slotIterator.hasNext()) { - final PendingTaskManagerSlot additionalPendingTaskManagerSlot = new PendingTaskManagerSlot(slotIterator.next()); - pendingSlots.put(additionalPendingTaskManagerSlot.getTaskManagerSlotId(), additionalPendingTaskManagerSlot); - } - - return Optional.of(pendingTaskManagerSlot); - } - } - - private void assignPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest, PendingTaskManagerSlot pendingTaskManagerSlot) { - pendingTaskManagerSlot.assignPendingSlotRequest(pendingSlotRequest); - pendingSlotRequest.assignPendingTaskManagerSlot(pendingTaskManagerSlot); - } - - /** - * Allocates the given slot for the given slot request. This entails sending a registration - * message to the task manager and treating failures. - * - * @param taskManagerSlot to allocate for the given slot request - * @param pendingSlotRequest to allocate the given slot for - */ - private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) { - Preconditions.checkState(taskManagerSlot.getState() == TaskManagerSlot.State.FREE); - - TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection(); - TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway(); - - final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>(); - final AllocationID allocationId = pendingSlotRequest.getAllocationId(); - final SlotID slotId = taskManagerSlot.getSlotId(); - final InstanceID instanceID = taskManagerSlot.getInstanceId(); - - taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest); - pendingSlotRequest.setRequestFuture(completableFuture); - - returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest); - - TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID); - - if (taskManagerRegistration == null) { - throw new IllegalStateException("Could not find a registered task manager for instance id " + - instanceID + '.'); - } - - taskManagerRegistration.markUsed(); - - // RPC call to the task manager - CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot( - slotId, - pendingSlotRequest.getJobId(), - allocationId, - pendingSlotRequest.getTargetAddress(), - resourceManagerId, - taskManagerRequestTimeout); - - requestFuture.whenComplete( - (Acknowledge acknowledge, Throwable throwable) -> { - if (acknowledge != null) { - completableFuture.complete(acknowledge); - } else { - completableFuture.completeExceptionally(throwable); - } - }); - - completableFuture.whenCompleteAsync( - (Acknowledge acknowledge, Throwable throwable) -> { - try { - if (acknowledge != null) { - updateSlot(slotId, allocationId, pendingSlotRequest.getJobId()); - } else { - if (throwable instanceof SlotOccupiedException) { - SlotOccupiedException exception = (SlotOccupiedException) throwable; - updateSlot(slotId, exception.getAllocationId(), exception.getJobId()); - } else { - removeSlotRequestFromSlot(slotId, allocationId); - } - - if (!(throwable instanceof CancellationException)) { - handleFailedSlotRequest(slotId, allocationId, throwable); - } else { - LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable); - } - } - } catch (Exception e) { - LOG.error("Error while completing the slot allocation.", e); - } - }, - mainThreadExecutor); - } - - private void returnPendingTaskManagerSlotIfAssigned(PendingSlotRequest pendingSlotRequest) { - final PendingTaskManagerSlot pendingTaskManagerSlot = pendingSlotRequest.getAssignedPendingTaskManagerSlot(); - if (pendingTaskManagerSlot != null) { - pendingTaskManagerSlot.unassignPendingSlotRequest(); - pendingSlotRequest.unassignPendingTaskManagerSlot(); - } - } - - /** - * Handles a free slot. It first tries to find a pending slot request which can be fulfilled. - * If there is no such request, then it will add the slot to the set of free slots. - * - * @param freeSlot to find a new slot request for - */ - private void handleFreeSlot(TaskManagerSlot freeSlot) { - Preconditions.checkState(freeSlot.getState() == TaskManagerSlot.State.FREE); - - PendingSlotRequest pendingSlotRequest = findMatchingRequest(freeSlot.getResourceProfile()); - - if (null != pendingSlotRequest) { - allocateSlot(freeSlot, pendingSlotRequest); - } else { - freeSlots.put(freeSlot.getSlotId(), freeSlot); - } - } - - /** - * Removes the given set of slots from the slot manager. - * - * @param slotsToRemove identifying the slots to remove from the slot manager - */ - private void removeSlots(Iterable<SlotID> slotsToRemove) { - for (SlotID slotId : slotsToRemove) { - removeSlot(slotId); - } - } - - /** - * Removes the given slot from the slot manager. - * - * @param slotId identifying the slot to remove - */ - private void removeSlot(SlotID slotId) { - TaskManagerSlot slot = slots.remove(slotId); - - if (null != slot) { - freeSlots.remove(slotId); - - if (slot.getState() == TaskManagerSlot.State.PENDING) { - // reject the pending slot request --> triggering a new allocation attempt - rejectPendingSlotRequest( - slot.getAssignedSlotRequest(), - new Exception("The assigned slot " + slot.getSlotId() + " was removed.")); - } - - AllocationID oldAllocationId = slot.getAllocationId(); - - if (oldAllocationId != null) { - fulfilledSlotRequests.remove(oldAllocationId); - - resourceActions.notifyAllocationFailure( - slot.getJobId(), - oldAllocationId, - new FlinkException("The assigned slot " + slot.getSlotId() + " was removed.")); - } - } else { - LOG.debug("There was no slot registered with slot id {}.", slotId); - } - } - - // --------------------------------------------------------------------------------------------- - // Internal request handling methods - // --------------------------------------------------------------------------------------------- - - /** - * Removes a pending slot request identified by the given allocation id from a slot identified - * by the given slot id. - * - * @param slotId identifying the slot - * @param allocationId identifying the presumable assigned pending slot request - */ - private void removeSlotRequestFromSlot(SlotID slotId, AllocationID allocationId) { - TaskManagerSlot taskManagerSlot = slots.get(slotId); - - if (null != taskManagerSlot) { - if (taskManagerSlot.getState() == TaskManagerSlot.State.PENDING && Objects.equals(allocationId, taskManagerSlot.getAssignedSlotRequest().getAllocationId())) { - - TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId()); - - if (taskManagerRegistration == null) { - throw new IllegalStateException("Trying to remove slot request from slot for which there is no TaskManager " + taskManagerSlot.getInstanceId() + " is registered."); - } - - // clear the pending slot request - taskManagerSlot.clearPendingSlotRequest(); - - updateSlotState(taskManagerSlot, taskManagerRegistration, null, null); - } else { - LOG.debug("Ignore slot request removal for slot {}.", slotId); - } - } else { - LOG.debug("There was no slot with {} registered. Probably this slot has been already freed.", slotId); - } - } - - /** - * Handles a failed slot request. The slot manager tries to find a new slot fulfilling - * the resource requirements for the failed slot request. - * - * @param slotId identifying the slot which was assigned to the slot request before - * @param allocationId identifying the failed slot request - * @param cause of the failure - */ - private void handleFailedSlotRequest(SlotID slotId, AllocationID allocationId, Throwable cause) { - PendingSlotRequest pendingSlotRequest = pendingSlotRequests.get(allocationId); - - LOG.debug("Slot request with allocation id {} failed for slot {}.", allocationId, slotId, cause); - - if (null != pendingSlotRequest) { - pendingSlotRequest.setRequestFuture(null); - - try { - internalRequestSlot(pendingSlotRequest); - } catch (ResourceManagerException e) { - pendingSlotRequests.remove(allocationId); - - resourceActions.notifyAllocationFailure( - pendingSlotRequest.getJobId(), - allocationId, - e); - } - } else { - LOG.debug("There was not pending slot request with allocation id {}. Probably the request has been fulfilled or cancelled.", allocationId); - } - } - - /** - * Rejects the pending slot request by failing the request future with a - * {@link SlotAllocationException}. - * - * @param pendingSlotRequest to reject - * @param cause of the rejection - */ - private void rejectPendingSlotRequest(PendingSlotRequest pendingSlotRequest, Exception cause) { - CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture(); - - if (null != request) { - request.completeExceptionally(new SlotAllocationException(cause)); - } else { - LOG.debug("Cannot reject pending slot request {}, since no request has been sent.", pendingSlotRequest.getAllocationId()); - } - } - - /** - * Cancels the given slot request. - * - * @param pendingSlotRequest to cancel - */ - private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) { - CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture(); - - returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest); - - if (null != request) { - request.cancel(false); - } - } - - // --------------------------------------------------------------------------------------------- - // Internal timeout methods - // --------------------------------------------------------------------------------------------- + void setFailUnfulfillableRequest(boolean failUnfulfillableRequest); @VisibleForTesting - void checkTaskManagerTimeouts() { - if (!taskManagerRegistrations.isEmpty()) { - long currentTime = System.currentTimeMillis(); - - ArrayList<TaskManagerRegistration> timedOutTaskManagers = new ArrayList<>(taskManagerRegistrations.size()); - - // first retrieve the timed out TaskManagers - for (TaskManagerRegistration taskManagerRegistration : taskManagerRegistrations.values()) { - if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) { - // we collect the instance ids first in order to avoid concurrent modifications by the - // ResourceActions.releaseResource call - timedOutTaskManagers.add(taskManagerRegistration); - } - } - - // second we trigger the release resource callback which can decide upon the resource release - for (TaskManagerRegistration taskManagerRegistration : timedOutTaskManagers) { - if (waitResultConsumedBeforeRelease) { - releaseTaskExecutorIfPossible(taskManagerRegistration); - } else { - releaseTaskExecutor(taskManagerRegistration.getInstanceId()); - } - } - } - } - - private void releaseTaskExecutorIfPossible(TaskManagerRegistration taskManagerRegistration) { - long idleSince = taskManagerRegistration.getIdleSince(); - taskManagerRegistration - .getTaskManagerConnection() - .getTaskExecutorGateway() - .canBeReleased() - .thenAcceptAsync( - canBeReleased -> { - InstanceID timedOutTaskManagerId = taskManagerRegistration.getInstanceId(); - boolean stillIdle = idleSince == taskManagerRegistration.getIdleSince(); - if (stillIdle && canBeReleased) { - releaseTaskExecutor(timedOutTaskManagerId); - } - }, - mainThreadExecutor); - } - - private void releaseTaskExecutor(InstanceID timedOutTaskManagerId) { - final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout."); - LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId); - resourceActions.releaseResource(timedOutTaskManagerId, cause); - } - - private void checkSlotRequestTimeouts() { - if (!pendingSlotRequests.isEmpty()) { - long currentTime = System.currentTimeMillis(); - - Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator(); - - while (slotRequestIterator.hasNext()) { - PendingSlotRequest slotRequest = slotRequestIterator.next().getValue(); - - if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) { - slotRequestIterator.remove(); - - if (slotRequest.isAssigned()) { - cancelPendingSlotRequest(slotRequest); - } - - resourceActions.notifyAllocationFailure( - slotRequest.getJobId(), - slotRequest.getAllocationId(), - new TimeoutException("The allocation could not be fulfilled in time.")); - } - } - } - } - - // --------------------------------------------------------------------------------------------- - // Internal utility methods - // --------------------------------------------------------------------------------------------- - - private void internalUnregisterTaskManager(TaskManagerRegistration taskManagerRegistration) { - Preconditions.checkNotNull(taskManagerRegistration); - - removeSlots(taskManagerRegistration.getSlots()); - } - - private boolean checkDuplicateRequest(AllocationID allocationId) { - return pendingSlotRequests.containsKey(allocationId) || fulfilledSlotRequests.containsKey(allocationId); - } - - private void checkInit() { - Preconditions.checkState(started, "The slot manager has not been started."); - } - - // --------------------------------------------------------------------------------------------- - // Testing methods - // --------------------------------------------------------------------------------------------- - - @VisibleForTesting - TaskManagerSlot getSlot(SlotID slotId) { - return slots.get(slotId); - } - - @VisibleForTesting - PendingSlotRequest getSlotRequest(AllocationID allocationId) { - return pendingSlotRequests.get(allocationId); - } - - @VisibleForTesting - boolean isTaskManagerIdle(InstanceID instanceId) { - TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); - - if (null != taskManagerRegistration) { - return taskManagerRegistration.isIdle(); - } else { - return false; - } - } - - @VisibleForTesting - public void unregisterTaskManagersAndReleaseResources() { - Iterator<Map.Entry<InstanceID, TaskManagerRegistration>> taskManagerRegistrationIterator = - taskManagerRegistrations.entrySet().iterator(); - - while (taskManagerRegistrationIterator.hasNext()) { - TaskManagerRegistration taskManagerRegistration = - taskManagerRegistrationIterator.next().getValue(); - - taskManagerRegistrationIterator.remove(); - - internalUnregisterTaskManager(taskManagerRegistration); - - resourceActions.releaseResource(taskManagerRegistration.getInstanceId(), new FlinkException("Triggering of SlotManager#unregisterTaskManagersAndReleaseResources.")); - } - } + void unregisterTaskManagersAndReleaseResources(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java similarity index 97% copy from flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java index 71f3df6..376d030 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java @@ -62,18 +62,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** - * The slot manager is responsible for maintaining a view on all registered task manager slots, - * their allocation and all pending slot requests. Whenever a new slot is registered or and - * allocated slot is freed, then it tries to fulfill another pending slot request. Whenever there - * are not enough slots available the slot manager will notify the resource manager about it via - * {@link ResourceActions#allocateResource(ResourceProfile)}. - * - * <p>In order to free resources and avoid resource leaks, idling task managers (task managers whose - * slots are currently not used) and pending slot requests time out triggering their release and - * failure, respectively. + * Implementation of {@link SlotManager}. */ -public class SlotManager implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); +public class SlotManagerImpl implements SlotManager { + private static final Logger LOG = LoggerFactory.getLogger(SlotManagerImpl.class); /** Scheduled executor for timeouts. */ private final ScheduledExecutor scheduledExecutor; @@ -129,9 +121,9 @@ public class SlotManager implements AutoCloseable { * A slot request is considered unfulfillable if it cannot be fulfilled by neither a slot that is already registered * (including allocated ones) nor a pending slot that the {@link ResourceActions} can allocate. * */ - private boolean failUnfulfillableRequest = false; + private boolean failUnfulfillableRequest = true; - public SlotManager( + public SlotManagerImpl( ScheduledExecutor scheduledExecutor, Time taskManagerRequestTimeout, Time slotRequestTimeout, @@ -160,10 +152,12 @@ public class SlotManager implements AutoCloseable { started = false; } + @Override public int getNumberRegisteredSlots() { return slots.size(); } + @Override public int getNumberRegisteredSlotsOf(InstanceID instanceId) { TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); @@ -174,10 +168,12 @@ public class SlotManager implements AutoCloseable { } } + @Override public int getNumberFreeSlots() { return freeSlots.size(); } + @Override public int getNumberFreeSlotsOf(InstanceID instanceId) { TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); @@ -188,20 +184,23 @@ public class SlotManager implements AutoCloseable { } } + @Override public int getNumberPendingTaskManagerSlots() { return pendingSlots.size(); } + @Override public int getNumberPendingSlotRequests() { return pendingSlotRequests.size(); } + @Override public boolean isFailingUnfulfillableRequest() { return failUnfulfillableRequest; } @VisibleForTesting - int getNumberAssignedPendingTaskManagerSlots() { + public int getNumberAssignedPendingTaskManagerSlots() { return (int) pendingSlots.values().stream().filter(slot -> slot.getAssignedPendingSlotRequest() != null).count(); } @@ -216,7 +215,9 @@ public class SlotManager implements AutoCloseable { * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread * @param newResourceActions to use for resource (de-)allocations */ - public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) { + @Override + public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, + ResourceActions newResourceActions) { LOG.info("Starting the SlotManager."); this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId); @@ -243,6 +244,7 @@ public class SlotManager implements AutoCloseable { /** * Suspends the component. This clears the internal state of the slot manager. */ + @Override public void suspend() { LOG.info("Suspending the SlotManager."); @@ -297,6 +299,7 @@ public class SlotManager implements AutoCloseable { * @return true if the slot request was registered; false if the request is a duplicate * @throws SlotManagerException if the slot request failed (e.g. not enough resources left) */ + @Override public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException { checkInit(); @@ -329,6 +332,7 @@ public class SlotManager implements AutoCloseable { * @param allocationId identifying the pending slot request * @return True if a pending slot request was found; otherwise false */ + @Override public boolean unregisterSlotRequest(AllocationID allocationId) { checkInit(); @@ -354,6 +358,7 @@ public class SlotManager implements AutoCloseable { * @param taskExecutorConnection for the new task manager * @param initialSlotReport for the new task manager */ + @Override public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) { checkInit(); @@ -396,6 +401,7 @@ public class SlotManager implements AutoCloseable { * @param instanceId identifying the task manager to unregister * @return True if there existed a registered task manager with the given instance id */ + @Override public boolean unregisterTaskManager(InstanceID instanceId) { checkInit(); @@ -421,6 +427,7 @@ public class SlotManager implements AutoCloseable { * @param slotReport containing the status for all of its slots * @return true if the slot status has been updated successfully, otherwise false */ + @Override public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) { checkInit(); @@ -449,6 +456,7 @@ public class SlotManager implements AutoCloseable { * @param slotId identifying the slot to free * @param allocationId with which the slot is presumably allocated */ + @Override public void freeSlot(SlotID slotId, AllocationID allocationId) { checkInit(); @@ -478,6 +486,7 @@ public class SlotManager implements AutoCloseable { } } + @Override public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) { if (!this.failUnfulfillableRequest && failUnfulfillableRequest) { // fail unfulfillable pending requests @@ -1181,6 +1190,7 @@ public class SlotManager implements AutoCloseable { } } + @Override @VisibleForTesting public void unregisterTaskManagersAndReleaseResources() { Iterator<Map.Entry<InstanceID, TaskManagerRegistration>> taskManagerRegistrationIterator = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerBuilder.java index cc99370..15b65ed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerBuilder.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.testingUtils.TestingUtils; -/** Builder for {@link SlotManager}. */ +/** Builder for {@link SlotManagerImpl}. */ public class SlotManagerBuilder { private ScheduledExecutor scheduledExecutor; private Time taskManagerRequestTimeout; @@ -67,8 +67,8 @@ public class SlotManagerBuilder { return this; } - public SlotManager build() { - return new SlotManager( + public SlotManagerImpl build() { + return new SlotManagerImpl( scheduledExecutor, taskManagerRequestTimeout, slotRequestTimeout, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index c358866..b155ed6 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -95,7 +95,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** - * Tests for the {@link SlotManager}. + * Tests for the {@link SlotManagerImpl}. */ public class SlotManagerTest extends TestLogger { @@ -118,7 +118,7 @@ public class SlotManagerTest extends TestLogger { final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile); final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2)); - try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { + try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { slotManager.registerTaskManager(taskManagerConnection, slotReport); assertTrue("The number registered slots does not equal the expected number.", 2 == slotManager.getNumberRegisteredSlots()); @@ -162,7 +162,7 @@ public class SlotManagerTest extends TestLogger { resourceProfile, "foobar"); - try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { + try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { slotManager.registerTaskManager(taskManagerConnection, slotReport); assertTrue("The number registered slots does not equal the expected number.", 2 == slotManager.getNumberRegisteredSlots()); @@ -265,7 +265,7 @@ public class SlotManagerTest extends TestLogger { ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); - try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { + try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { final CompletableFuture<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestFuture = new CompletableFuture<>(); // accept an incoming slot request final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() @@ -318,7 +318,7 @@ public class SlotManagerTest extends TestLogger { final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); - try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { + try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { slotManager.registerTaskManager(taskManagerConnection, slotReport); TaskManagerSlot slot = slotManager.getSlot(slotId); @@ -375,7 +375,7 @@ public class SlotManagerTest extends TestLogger { final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); final SlotReport slotReport = new SlotReport(slotStatus); - try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { + try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest)); @@ -415,7 +415,7 @@ public class SlotManagerTest extends TestLogger { final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId); final SlotReport slotReport = new SlotReport(slotStatus); - try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { + try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { slotManager.registerTaskManager( taskExecutorConnection, @@ -521,7 +521,7 @@ public class SlotManagerTest extends TestLogger { final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile1); final SlotReport slotReport = new SlotReport(slotStatus); - try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { + try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { slotManager.registerTaskManager(taskManagerConnection, slotReport); assertTrue(slotManager.registerSlotRequest(slotRequest1)); @@ -563,7 +563,7 @@ public class SlotManagerTest extends TestLogger { final SlotStatus slotStatus = new SlotStatus(slotId, new ResourceProfile(2.0, 2)); final SlotReport slotReport = new SlotReport(slotStatus); - try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { + try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { slotManager.registerTaskManager(taskManagerConnection, slotReport); assertTrue(slotManager.registerSlotRequest(slotRequest1)); @@ -641,7 +641,7 @@ public class SlotManagerTest extends TestLogger { final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway); - try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { + try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { // check that we don't have any slots registered assertTrue(0 == slotManager.getNumberRegisteredSlots()); @@ -747,7 +747,7 @@ public class SlotManagerTest extends TestLogger { final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile); final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2)); - try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { + try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { slotManager.registerTaskManager(taskManagerConnection, slotReport); @@ -827,7 +827,7 @@ public class SlotManagerTest extends TestLogger { final Executor mainThreadExecutor = TestingUtils.defaultExecutor(); - try (final SlotManager slotManager = SlotManagerBuilder.newBuilder().build()) { + try (final SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder().build()) { slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions); @@ -943,7 +943,7 @@ public class SlotManagerTest extends TestLogger { final Executor mainThreadExecutor = TestingUtils.defaultExecutor(); - try (final SlotManager slotManager = SlotManagerBuilder.newBuilder() + try (final SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder() .setTaskManagerTimeout(Time.of(taskManagerTimeout, TimeUnit.MILLISECONDS)) .build()) { @@ -1046,7 +1046,7 @@ public class SlotManagerTest extends TestLogger { final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, taskExecutorGateway); - try (final SlotManager slotManager = SlotManagerBuilder.newBuilder().build()) { + try (final SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder().build()) { slotManager.start(ResourceManagerId.generate(), Executors.directExecutor(), resourceActions); @@ -1095,7 +1095,7 @@ public class SlotManagerTest extends TestLogger { */ @Test public void testSlotRequestFailure() throws Exception { - try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), + try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActionsBuilder().build())) { final SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar"); @@ -1150,7 +1150,7 @@ public class SlotManagerTest extends TestLogger { */ @Test public void testSlotRequestRemovedIfTMReportAllocation() throws Exception { - try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), + try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActionsBuilder().build())) { final JobID jobID = new JobID(); @@ -1326,8 +1326,8 @@ public class SlotManagerTest extends TestLogger { return new SlotRequest(jobId, new AllocationID(), resourceProfile, "foobar1"); } - private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) { - SlotManager slotManager = SlotManagerBuilder.newBuilder().build(); + private SlotManagerImpl createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) { + SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder().build(); slotManager.start(resourceManagerId, Executors.directExecutor(), resourceManagerActions); return slotManager; } @@ -1348,7 +1348,7 @@ public class SlotManagerTest extends TestLogger { })) .build(); - try (final SlotManager slotManager = createSlotManager( + try (final SlotManagerImpl slotManager = createSlotManager( ResourceManagerId.generate(), testingResourceActions)) { @@ -1377,7 +1377,7 @@ public class SlotManagerTest extends TestLogger { final TestingResourceActions resourceActions = new TestingResourceActionsBuilder() .setAllocateResourceFunction(convert(value -> numberSlots)) .build(); - try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { final JobID jobId = new JobID(); final SlotRequest slotRequest = createSlotRequest(jobId); @@ -1403,7 +1403,7 @@ public class SlotManagerTest extends TestLogger { .setAllocateResourceFunction(convert(value -> numberSlots)) .build(); - try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { final JobID jobId = new JobID(); assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); @@ -1438,7 +1438,7 @@ public class SlotManagerTest extends TestLogger { .setAllocateResourceFunction(convert(value -> numberSlots)) .build(); - try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { final JobID jobId = new JobID(); final ResourceProfile requestedSlotProfile = new ResourceProfile(1.0, 1); @@ -1469,7 +1469,7 @@ public class SlotManagerTest extends TestLogger { .setAllocateResourceFunction(convert(value -> numberSlots)) .build(); - try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { final JobID jobId = new JobID(); assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java index 25f7ccb..8252c48 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java @@ -104,7 +104,7 @@ public class TaskManagerReleaseInSlotManagerTest extends TestLogger { */ @Test public void testTaskManagerIsNotReleasedBeforeItCanBe() throws Exception { - try (SlotManager slotManager = createAndStartSlotManagerWithTM()) { + try (SlotManagerImpl slotManager = createAndStartSlotManagerWithTM()) { checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, false); verifyTmReleased(false); @@ -118,7 +118,7 @@ public class TaskManagerReleaseInSlotManagerTest extends TestLogger { */ @Test public void testTaskManagerIsNotReleasedInCaseOfConcurrentAllocation() throws Exception { - try (SlotManager slotManager = createAndStartSlotManagerWithTM()) { + try (SlotManagerImpl slotManager = createAndStartSlotManagerWithTM()) { checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true, () -> { // Allocate and free slot between triggering TM.canBeReleased request and receiving response. // There can be potentially newly unreleased partitions, therefore TM can not be released yet. @@ -134,8 +134,8 @@ public class TaskManagerReleaseInSlotManagerTest extends TestLogger { } } - private SlotManager createAndStartSlotManagerWithTM() { - SlotManager slotManager = SlotManagerBuilder + private SlotManagerImpl createAndStartSlotManagerWithTM() { + SlotManagerImpl slotManager = SlotManagerBuilder .newBuilder() .setScheduledExecutor(mainThreadExecutor) .setTaskManagerTimeout(Time.milliseconds(0L)) @@ -146,13 +146,13 @@ public class TaskManagerReleaseInSlotManagerTest extends TestLogger { } private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse( - SlotManager slotManager, + SlotManagerImpl slotManager, boolean canBeReleased) throws Exception { checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, canBeReleased, () -> {}); } private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse( - SlotManager slotManager, + SlotManagerImpl slotManager, boolean canBeReleased, RunnableWithException doAfterCheckTriggerBeforeCanBeReleasedResponse) throws Exception { canBeReleasedFuture.set(new CompletableFuture<>());