tillrohrmann commented on a change in pull request #10682: [FLINK-15247][Runtime] Wait for all slots to be free before task executor services shutdown upon stopping URL: https://github.com/apache/flink/pull/10682#discussion_r369022521
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java ########## @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.slot; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceBudgetManager; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Default implementation of {@link TaskSlotTable}. + */ +public class TaskSlotTableImpl implements TaskSlotTable { + + private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTableImpl.class); + + /** + * Number of slots in static slot allocation. + * If slot is requested with an index, the requested index must within the range of [0, numberSlots). + * When generating slot report, we should always generate slots with index in [0, numberSlots) even the slot does not exist. + */ + private final int numberSlots; + + /** Slot resource profile for static slot allocation. */ + private final ResourceProfile defaultSlotResourceProfile; + + /** Page size for memory manager. */ + private final int memoryPageSize; + + /** Timer service used to time out allocated slots. */ + private final TimerService<AllocationID> timerService; + + /** The list of all task slots. */ + private final Map<Integer, TaskSlot> taskSlots; + + /** Mapping from allocation id to task slot. */ + private final Map<AllocationID, TaskSlot> allocatedSlots; + + /** Mapping from execution attempt id to task and task slot. */ + private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings; + + /** Mapping from job id to allocated slots for a job. */ + private final Map<JobID, Set<AllocationID>> slotsPerJob; + + /** Interface for slot actions, such as freeing them or timing them out. */ + private SlotActions slotActions; + + /** The table state. */ + private volatile State state; + + private final ResourceBudgetManager budgetManager; + + /** The closing future is completed when all slot are freed and state is closed. */ + private CompletableFuture<Void> closingFuture; + + /** {@link Executor} to schedule internal calls to the main thread. */ + private Executor mainThreadExecutor; + + public TaskSlotTableImpl( + final int numberSlots, + final ResourceProfile totalAvailableResourceProfile, + final ResourceProfile defaultSlotResourceProfile, + final int memoryPageSize, + final TimerService<AllocationID> timerService) { + + Preconditions.checkArgument(0 < numberSlots, "The number of task slots must be greater than 0."); + + this.numberSlots = numberSlots; + this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile); + this.memoryPageSize = memoryPageSize; + + this.taskSlots = new HashMap<>(numberSlots); + + this.timerService = Preconditions.checkNotNull(timerService); + + budgetManager = new ResourceBudgetManager(Preconditions.checkNotNull(totalAvailableResourceProfile)); + + allocatedSlots = new HashMap<>(numberSlots); + + taskSlotMappings = new HashMap<>(4 * numberSlots); + + slotsPerJob = new HashMap<>(4); + + slotActions = null; + state = State.CREATED; + } + + @Override + public void start(SlotActions initialSlotActions, Executor mainThreadExecutor) { + Preconditions.checkState( + state == State.CREATED, + "The %s has to be just created before starting", + TaskSlotTableImpl.class.getSimpleName()); + this.slotActions = Preconditions.checkNotNull(initialSlotActions); + this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor); + + timerService.start(this); + + state = State.RUNNING; + } + + @Override + public CompletableFuture<Void> closeAsync() { + if (state == State.CREATED) { + state = State.CLOSED; + closingFuture = CompletableFuture.completedFuture(null); + } else if (state == State.RUNNING) { + state = State.CLOSING; + final FlinkException cause = new FlinkException("Closing task slot table"); + closingFuture = FutureUtils + .waitForAll( + new ArrayList<>(allocatedSlots.values()) + .stream() + .map(slot -> freeSlot(slot, cause)) + .collect(Collectors.toList())) + .thenRunAsync( + () -> { + state = State.CLOSED; + timerService.stop(); + }, + mainThreadExecutor); + } + return closingFuture; + } + + @VisibleForTesting + public boolean isClosed() { + return state == State.CLOSED && + taskSlots.values().stream().allMatch(slot -> slot.getMemoryManager().isShutdown()) && + allocatedSlots.values().stream().allMatch(slot -> slot.getMemoryManager().isShutdown()); + } + + @Override + public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) { + final Set<AllocationID> allocationIds = slotsPerJob.get(jobId); + + if (allocationIds == null) { + return Collections.emptySet(); + } else { + return Collections.unmodifiableSet(allocationIds); + } + } + + // --------------------------------------------------------------------- + // Slot report methods + // --------------------------------------------------------------------- + + @Override + public SlotReport createSlotReport(ResourceID resourceId) { + List<SlotStatus> slotStatuses = new ArrayList<>(); + + for (int i = 0; i < numberSlots; i++) { + SlotID slotId = new SlotID(resourceId, i); + SlotStatus slotStatus; + if (taskSlots.containsKey(i)) { + TaskSlot taskSlot = taskSlots.get(i); + + slotStatus = new SlotStatus( + slotId, + taskSlot.getResourceProfile(), + taskSlot.getJobId(), + taskSlot.getAllocationId()); + } else { + slotStatus = new SlotStatus( + slotId, + defaultSlotResourceProfile, + null, + null); + } + + slotStatuses.add(slotStatus); + } + + for (TaskSlot taskSlot : allocatedSlots.values()) { + if (taskSlot.getIndex() < 0) { + SlotID slotID = SlotID.generateDynamicSlotID(resourceId); + SlotStatus slotStatus = new SlotStatus( + slotID, + taskSlot.getResourceProfile(), + taskSlot.getJobId(), + taskSlot.getAllocationId()); + slotStatuses.add(slotStatus); + } + } + + final SlotReport slotReport = new SlotReport(slotStatuses); + + return slotReport; + } + + // --------------------------------------------------------------------- + // Slot methods + // --------------------------------------------------------------------- + + @Override + @VisibleForTesting + public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) { + return allocateSlot(index, jobId, allocationId, defaultSlotResourceProfile, slotTimeout); + } + + @Override + public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, Time slotTimeout) { + checkRunning(); + + Preconditions.checkArgument(index < numberSlots); + + TaskSlot taskSlot = allocatedSlots.get(allocationId); + if (taskSlot != null) { + LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot); + return false; + } + + if (taskSlots.containsKey(index)) { + TaskSlot duplicatedTaskSlot = taskSlots.get(index); + LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.", + index, + duplicatedTaskSlot.getResourceProfile(), + duplicatedTaskSlot.getJobId(), + duplicatedTaskSlot.getAllocationId()); + return duplicatedTaskSlot.getJobId().equals(jobId) && + duplicatedTaskSlot.getAllocationId().equals(allocationId); + } else if (allocatedSlots.containsKey(allocationId)) { + return true; + } + + resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile; + + if (!budgetManager.reserve(resourceProfile)) { + LOG.info("Cannot allocate the requested resources. Trying to allocate {}, " + + "while the currently remaining available resources are {}, total is {}.", + resourceProfile, + budgetManager.getAvailableBudget(), + budgetManager.getTotalBudget()); + return false; + } + + taskSlot = new TaskSlot(index, resourceProfile, memoryPageSize, jobId, allocationId); + if (index >= 0) { + taskSlots.put(index, taskSlot); + } + + // update the allocation id to task slot map + allocatedSlots.put(allocationId, taskSlot); + + // register a timeout for this slot since it's in state allocated + timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit()); + + // add this slot to the set of job slots + Set<AllocationID> slots = slotsPerJob.get(jobId); + + if (slots == null) { + slots = new HashSet<>(4); + slotsPerJob.put(jobId, slots); + } + + slots.add(allocationId); + + return true; + } + + @Override + public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException { + checkRunning(); + + TaskSlot taskSlot = getTaskSlot(allocationId); + + if (taskSlot != null) { + if (taskSlot.markActive()) { + // unregister a potential timeout + LOG.info("Activate slot {}.", allocationId); + + timerService.unregisterTimeout(allocationId); + + return true; + } else { + return false; + } + } else { + throw new SlotNotFoundException(allocationId); + } + } + + @Override + public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException { + checkStarted(); + + TaskSlot taskSlot = getTaskSlot(allocationId); + + if (taskSlot != null) { + if (taskSlot.markInactive()) { + // register a timeout to free the slot + timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit()); + + return true; + } else { + return false; + } + } else { + throw new SlotNotFoundException(allocationId); + } + } + + @Override + public int freeSlot(AllocationID allocationId) throws SlotNotFoundException { + return freeSlot(allocationId, new Exception("The task slot of this task is being freed.")); + } + + @Override + public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException { + checkStarted(); + + TaskSlot taskSlot = getTaskSlot(allocationId); + + if (taskSlot != null) { + return freeSlot(taskSlot, cause).isDone() ? taskSlot.getIndex() : -1; + } else { + throw new SlotNotFoundException(allocationId); + } + } + + private CompletableFuture<Void> freeSlot(TaskSlot taskSlot, Throwable cause) { + AllocationID allocationId = taskSlot.getAllocationId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Free slot {}.", taskSlot, cause); + } else { + LOG.info("Free slot {}.", taskSlot); + } + + final JobID jobId = taskSlot.getJobId(); + + if (taskSlot.isEmpty()) { + // remove the allocation id to task slot mapping + allocatedSlots.remove(allocationId); + + // unregister a potential timeout + timerService.unregisterTimeout(allocationId); + + Set<AllocationID> slots = slotsPerJob.get(jobId); + + if (slots == null) { + throw new IllegalStateException("There are no more slots allocated for the job " + jobId + + ". This indicates a programming bug."); + } + + slots.remove(allocationId); + + if (slots.isEmpty()) { + slotsPerJob.remove(jobId); + } + + taskSlots.remove(taskSlot.getIndex()); + budgetManager.release(taskSlot.getResourceProfile()); + } + return taskSlot.closeAsync(cause); + } + + @Override + public boolean isValidTimeout(AllocationID allocationId, UUID ticket) { + checkStarted(); + + return state == State.RUNNING && timerService.isValid(allocationId, ticket); + } + + @Override + public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) { + TaskSlot taskSlot = taskSlots.get(index); + if (taskSlot != null) { + return taskSlot.isAllocated(jobId, allocationId); + } else if (index < 0) { + return allocatedSlots.containsKey(allocationId); + } else { + return false; + } + } + + @Override + public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) { + TaskSlot taskSlot = getTaskSlot(allocationId); + + if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) { + return taskSlot.markActive(); + } else { + return false; + } + } + + @Override + public boolean isSlotFree(int index) { + return !taskSlots.containsKey(index); + } + + @Override + public boolean hasAllocatedSlots(JobID jobId) { + return getAllocatedSlots(jobId).hasNext(); + } + + @Override + public Iterator<TaskSlot> getAllocatedSlots(JobID jobId) { + return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED); + } + + @Override + public Iterator<AllocationID> getActiveSlots(JobID jobId) { + return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE); + } + + @Override + @Nullable + public JobID getOwningJob(AllocationID allocationId) { + final TaskSlot taskSlot = getTaskSlot(allocationId); + + if (taskSlot != null) { + return taskSlot.getJobId(); + } else { + return null; + } + } + + // --------------------------------------------------------------------- + // Task methods + // --------------------------------------------------------------------- + + @Override + public boolean addTask(Task task) throws SlotNotFoundException, SlotNotActiveException { + checkRunning(); + Preconditions.checkNotNull(task); + + TaskSlot taskSlot = getTaskSlot(task.getAllocationId()); + + if (taskSlot != null) { + if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) { + if (taskSlot.add(task)) { + taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping(task, taskSlot)); + + return true; + } else { + return false; + } + } else { + throw new SlotNotActiveException(task.getJobID(), task.getAllocationId()); + } + } else { + throw new SlotNotFoundException(task.getAllocationId()); + } + } + + @Override + public Task removeTask(ExecutionAttemptID executionAttemptID) { + checkRunningOrClosing(); Review comment: Should we replace this check with `checkStarted()`? That way we would not impose that all removal task calls come before finishing the close operation. Otherwise we would need to add a test which guards this assumption of the `Task`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services