tillrohrmann commented on a change in pull request #10682: 
[FLINK-15247][Runtime] Wait for all slots to be free before task executor 
services shutdown upon stopping
URL: https://github.com/apache/flink/pull/10682#discussion_r369022521
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
 ##########
 @@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceBudgetManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link TaskSlotTable}.
+ */
+public class TaskSlotTableImpl implements TaskSlotTable {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TaskSlotTableImpl.class);
+
+       /**
+        * Number of slots in static slot allocation.
+        * If slot is requested with an index, the requested index must within 
the range of [0, numberSlots).
+        * When generating slot report, we should always generate slots with 
index in [0, numberSlots) even the slot does not exist.
+        */
+       private final int numberSlots;
+
+       /** Slot resource profile for static slot allocation. */
+       private final ResourceProfile defaultSlotResourceProfile;
+
+       /** Page size for memory manager. */
+       private final int memoryPageSize;
+
+       /** Timer service used to time out allocated slots. */
+       private final TimerService<AllocationID> timerService;
+
+       /** The list of all task slots. */
+       private final Map<Integer, TaskSlot> taskSlots;
+
+       /** Mapping from allocation id to task slot. */
+       private final Map<AllocationID, TaskSlot> allocatedSlots;
+
+       /** Mapping from execution attempt id to task and task slot. */
+       private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
+
+       /** Mapping from job id to allocated slots for a job. */
+       private final Map<JobID, Set<AllocationID>> slotsPerJob;
+
+       /** Interface for slot actions, such as freeing them or timing them 
out. */
+       private SlotActions slotActions;
+
+       /** The table state. */
+       private volatile State state;
+
+       private final ResourceBudgetManager budgetManager;
+
+       /** The closing future is completed when all slot are freed and state 
is closed. */
+       private CompletableFuture<Void> closingFuture;
+
+       /** {@link Executor} to schedule internal calls to the main thread. */
+       private Executor mainThreadExecutor;
+
+       public TaskSlotTableImpl(
+               final int numberSlots,
+               final ResourceProfile totalAvailableResourceProfile,
+               final ResourceProfile defaultSlotResourceProfile,
+               final int memoryPageSize,
+               final TimerService<AllocationID> timerService) {
+
+               Preconditions.checkArgument(0 < numberSlots, "The number of 
task slots must be greater than 0.");
+
+               this.numberSlots = numberSlots;
+               this.defaultSlotResourceProfile = 
Preconditions.checkNotNull(defaultSlotResourceProfile);
+               this.memoryPageSize = memoryPageSize;
+
+               this.taskSlots = new HashMap<>(numberSlots);
+
+               this.timerService = Preconditions.checkNotNull(timerService);
+
+               budgetManager = new 
ResourceBudgetManager(Preconditions.checkNotNull(totalAvailableResourceProfile));
+
+               allocatedSlots = new HashMap<>(numberSlots);
+
+               taskSlotMappings = new HashMap<>(4 * numberSlots);
+
+               slotsPerJob = new HashMap<>(4);
+
+               slotActions = null;
+               state = State.CREATED;
+       }
+
+       @Override
+       public void start(SlotActions initialSlotActions, Executor 
mainThreadExecutor) {
+               Preconditions.checkState(
+                       state == State.CREATED,
+                       "The %s has to be just created before starting",
+                       TaskSlotTableImpl.class.getSimpleName());
+               this.slotActions = 
Preconditions.checkNotNull(initialSlotActions);
+               this.mainThreadExecutor = 
Preconditions.checkNotNull(mainThreadExecutor);
+
+               timerService.start(this);
+
+               state = State.RUNNING;
+       }
+
+       @Override
+       public CompletableFuture<Void> closeAsync() {
+               if (state == State.CREATED) {
+                       state = State.CLOSED;
+                       closingFuture = CompletableFuture.completedFuture(null);
+               } else if (state == State.RUNNING) {
+                       state = State.CLOSING;
+                       final FlinkException cause = new 
FlinkException("Closing task slot table");
+                       closingFuture = FutureUtils
+                               .waitForAll(
+                                       new ArrayList<>(allocatedSlots.values())
+                                               .stream()
+                                               .map(slot -> freeSlot(slot, 
cause))
+                                               .collect(Collectors.toList()))
+                               .thenRunAsync(
+                                       () -> {
+                                               state = State.CLOSED;
+                                               timerService.stop();
+                                       },
+                                       mainThreadExecutor);
+               }
+               return closingFuture;
+       }
+
+       @VisibleForTesting
+       public boolean isClosed() {
+               return state == State.CLOSED &&
+                       taskSlots.values().stream().allMatch(slot -> 
slot.getMemoryManager().isShutdown()) &&
+                       allocatedSlots.values().stream().allMatch(slot -> 
slot.getMemoryManager().isShutdown());
+       }
+
+       @Override
+       public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
+               final Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
+
+               if (allocationIds == null) {
+                       return Collections.emptySet();
+               } else {
+                       return Collections.unmodifiableSet(allocationIds);
+               }
+       }
+
+       // ---------------------------------------------------------------------
+       // Slot report methods
+       // ---------------------------------------------------------------------
+
+       @Override
+       public SlotReport createSlotReport(ResourceID resourceId) {
+               List<SlotStatus> slotStatuses = new ArrayList<>();
+
+               for (int i = 0; i < numberSlots; i++) {
+                       SlotID slotId = new SlotID(resourceId, i);
+                       SlotStatus slotStatus;
+                       if (taskSlots.containsKey(i)) {
+                               TaskSlot taskSlot = taskSlots.get(i);
+
+                               slotStatus = new SlotStatus(
+                                       slotId,
+                                       taskSlot.getResourceProfile(),
+                                       taskSlot.getJobId(),
+                                       taskSlot.getAllocationId());
+                       } else {
+                               slotStatus = new SlotStatus(
+                                       slotId,
+                                       defaultSlotResourceProfile,
+                                       null,
+                                       null);
+                       }
+
+                       slotStatuses.add(slotStatus);
+               }
+
+               for (TaskSlot taskSlot : allocatedSlots.values()) {
+                       if (taskSlot.getIndex() < 0) {
+                               SlotID slotID = 
SlotID.generateDynamicSlotID(resourceId);
+                               SlotStatus slotStatus = new SlotStatus(
+                                       slotID,
+                                       taskSlot.getResourceProfile(),
+                                       taskSlot.getJobId(),
+                                       taskSlot.getAllocationId());
+                               slotStatuses.add(slotStatus);
+                       }
+               }
+
+               final SlotReport slotReport = new SlotReport(slotStatuses);
+
+               return slotReport;
+       }
+
+       // ---------------------------------------------------------------------
+       // Slot methods
+       // ---------------------------------------------------------------------
+
+       @Override
+       @VisibleForTesting
+       public boolean allocateSlot(int index, JobID jobId, AllocationID 
allocationId, Time slotTimeout) {
+               return allocateSlot(index, jobId, allocationId, 
defaultSlotResourceProfile, slotTimeout);
+       }
+
+       @Override
+       public boolean allocateSlot(int index, JobID jobId, AllocationID 
allocationId, ResourceProfile resourceProfile, Time slotTimeout) {
+               checkRunning();
+
+               Preconditions.checkArgument(index < numberSlots);
+
+               TaskSlot taskSlot = allocatedSlots.get(allocationId);
+               if (taskSlot != null) {
+                       LOG.info("Allocation ID {} is already allocated in 
{}.", allocationId, taskSlot);
+                       return false;
+               }
+
+               if (taskSlots.containsKey(index)) {
+                       TaskSlot duplicatedTaskSlot = taskSlots.get(index);
+                       LOG.info("Slot with index {} already exist, with 
resource profile {}, job id {} and allocation id {}.",
+                               index,
+                               duplicatedTaskSlot.getResourceProfile(),
+                               duplicatedTaskSlot.getJobId(),
+                               duplicatedTaskSlot.getAllocationId());
+                       return duplicatedTaskSlot.getJobId().equals(jobId) &&
+                               
duplicatedTaskSlot.getAllocationId().equals(allocationId);
+               } else if (allocatedSlots.containsKey(allocationId)) {
+                       return true;
+               }
+
+               resourceProfile = index >= 0 ? defaultSlotResourceProfile : 
resourceProfile;
+
+               if (!budgetManager.reserve(resourceProfile)) {
+                       LOG.info("Cannot allocate the requested resources. 
Trying to allocate {}, "
+                                       + "while the currently remaining 
available resources are {}, total is {}.",
+                               resourceProfile,
+                               budgetManager.getAvailableBudget(),
+                               budgetManager.getTotalBudget());
+                       return false;
+               }
+
+               taskSlot = new TaskSlot(index, resourceProfile, memoryPageSize, 
jobId, allocationId);
+               if (index >= 0) {
+                       taskSlots.put(index, taskSlot);
+               }
+
+               // update the allocation id to task slot map
+               allocatedSlots.put(allocationId, taskSlot);
+
+               // register a timeout for this slot since it's in state 
allocated
+               timerService.registerTimeout(allocationId, 
slotTimeout.getSize(), slotTimeout.getUnit());
+
+               // add this slot to the set of job slots
+               Set<AllocationID> slots = slotsPerJob.get(jobId);
+
+               if (slots == null) {
+                       slots = new HashSet<>(4);
+                       slotsPerJob.put(jobId, slots);
+               }
+
+               slots.add(allocationId);
+
+               return true;
+       }
+
+       @Override
+       public boolean markSlotActive(AllocationID allocationId) throws 
SlotNotFoundException {
+               checkRunning();
+
+               TaskSlot taskSlot = getTaskSlot(allocationId);
+
+               if (taskSlot != null) {
+                       if (taskSlot.markActive()) {
+                               // unregister a potential timeout
+                               LOG.info("Activate slot {}.", allocationId);
+
+                               timerService.unregisterTimeout(allocationId);
+
+                               return true;
+                       } else {
+                               return false;
+                       }
+               } else {
+                       throw new SlotNotFoundException(allocationId);
+               }
+       }
+
+       @Override
+       public boolean markSlotInactive(AllocationID allocationId, Time 
slotTimeout) throws SlotNotFoundException {
+               checkStarted();
+
+               TaskSlot taskSlot = getTaskSlot(allocationId);
+
+               if (taskSlot != null) {
+                       if (taskSlot.markInactive()) {
+                               // register a timeout to free the slot
+                               timerService.registerTimeout(allocationId, 
slotTimeout.getSize(), slotTimeout.getUnit());
+
+                               return true;
+                       } else {
+                               return false;
+                       }
+               } else {
+                       throw new SlotNotFoundException(allocationId);
+               }
+       }
+
+       @Override
+       public int freeSlot(AllocationID allocationId) throws 
SlotNotFoundException {
+               return freeSlot(allocationId, new Exception("The task slot of 
this task is being freed."));
+       }
+
+       @Override
+       public int freeSlot(AllocationID allocationId, Throwable cause) throws 
SlotNotFoundException {
+               checkStarted();
+
+               TaskSlot taskSlot = getTaskSlot(allocationId);
+
+               if (taskSlot != null) {
+                       return freeSlot(taskSlot, cause).isDone() ? 
taskSlot.getIndex() : -1;
+               } else {
+                       throw new SlotNotFoundException(allocationId);
+               }
+       }
+
+       private CompletableFuture<Void> freeSlot(TaskSlot taskSlot, Throwable 
cause) {
+               AllocationID allocationId = taskSlot.getAllocationId();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Free slot {}.", taskSlot, cause);
+               } else {
+                       LOG.info("Free slot {}.", taskSlot);
+               }
+
+               final JobID jobId = taskSlot.getJobId();
+
+               if (taskSlot.isEmpty()) {
+                       // remove the allocation id to task slot mapping
+                       allocatedSlots.remove(allocationId);
+
+                       // unregister a potential timeout
+                       timerService.unregisterTimeout(allocationId);
+
+                       Set<AllocationID> slots = slotsPerJob.get(jobId);
+
+                       if (slots == null) {
+                               throw new IllegalStateException("There are no 
more slots allocated for the job " + jobId +
+                                       ". This indicates a programming bug.");
+                       }
+
+                       slots.remove(allocationId);
+
+                       if (slots.isEmpty()) {
+                               slotsPerJob.remove(jobId);
+                       }
+
+                       taskSlots.remove(taskSlot.getIndex());
+                       budgetManager.release(taskSlot.getResourceProfile());
+               }
+               return taskSlot.closeAsync(cause);
+       }
+
+       @Override
+       public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
+               checkStarted();
+
+               return state == State.RUNNING && 
timerService.isValid(allocationId, ticket);
+       }
+
+       @Override
+       public boolean isAllocated(int index, JobID jobId, AllocationID 
allocationId) {
+               TaskSlot taskSlot = taskSlots.get(index);
+               if (taskSlot != null) {
+                       return taskSlot.isAllocated(jobId, allocationId);
+               } else if (index < 0) {
+                       return allocatedSlots.containsKey(allocationId);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean tryMarkSlotActive(JobID jobId, AllocationID 
allocationId) {
+               TaskSlot taskSlot = getTaskSlot(allocationId);
+
+               if (taskSlot != null && taskSlot.isAllocated(jobId, 
allocationId)) {
+                       return taskSlot.markActive();
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean isSlotFree(int index) {
+               return !taskSlots.containsKey(index);
+       }
+
+       @Override
+       public boolean hasAllocatedSlots(JobID jobId) {
+               return getAllocatedSlots(jobId).hasNext();
+       }
+
+       @Override
+       public Iterator<TaskSlot> getAllocatedSlots(JobID jobId) {
+               return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED);
+       }
+
+       @Override
+       public Iterator<AllocationID> getActiveSlots(JobID jobId) {
+               return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE);
+       }
+
+       @Override
+       @Nullable
+       public JobID getOwningJob(AllocationID allocationId) {
+               final TaskSlot taskSlot = getTaskSlot(allocationId);
+
+               if (taskSlot != null) {
+                       return taskSlot.getJobId();
+               } else {
+                       return null;
+               }
+       }
+
+       // ---------------------------------------------------------------------
+       // Task methods
+       // ---------------------------------------------------------------------
+
+       @Override
+       public boolean addTask(Task task) throws SlotNotFoundException, 
SlotNotActiveException {
+               checkRunning();
+               Preconditions.checkNotNull(task);
+
+               TaskSlot taskSlot = getTaskSlot(task.getAllocationId());
+
+               if (taskSlot != null) {
+                       if (taskSlot.isActive(task.getJobID(), 
task.getAllocationId())) {
+                               if (taskSlot.add(task)) {
+                                       
taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping(task, 
taskSlot));
+
+                                       return true;
+                               } else {
+                                       return false;
+                               }
+                       } else {
+                               throw new 
SlotNotActiveException(task.getJobID(), task.getAllocationId());
+                       }
+               } else {
+                       throw new SlotNotFoundException(task.getAllocationId());
+               }
+       }
+
+       @Override
+       public Task removeTask(ExecutionAttemptID executionAttemptID) {
+               checkRunningOrClosing();
 
 Review comment:
   Should we replace this check with `checkStarted()`? That way we would not 
impose that all removal task calls come before finishing the close operation. 
Otherwise we would need to add a test which guards this assumption of the 
`Task`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to