xintongsong commented on a change in pull request #10682:
[FLINK-15247][Runtime] Wait for all slots to be free before task executor
services shutdown upon stopping
URL: https://github.com/apache/flink/pull/10682#discussion_r361275054
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
##########
@@ -403,54 +410,60 @@ public int freeSlot(AllocationID allocationId, Throwable
cause) throws SlotNotFo
TaskSlot taskSlot = getTaskSlot(allocationId);
if (taskSlot != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Free slot {}.", taskSlot, cause);
- } else {
- LOG.info("Free slot {}.", taskSlot);
- }
+ return freeSlot(taskSlot, cause);
+ } else {
+ throw new SlotNotFoundException(allocationId);
+ }
+ }
- final JobID jobId = taskSlot.getJobId();
+ private int freeSlot(TaskSlot taskSlot, Throwable cause) {
+ AllocationID allocationId = taskSlot.getAllocationId();
- if (taskSlot.isEmpty()) {
- // remove the allocation id to task slot mapping
- allocatedSlots.remove(allocationId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Free slot {}.", taskSlot, cause);
+ } else {
+ LOG.info("Free slot {}.", taskSlot);
+ }
- // unregister a potential timeout
- timerService.unregisterTimeout(allocationId);
+ final JobID jobId = taskSlot.getJobId();
- Set<AllocationID> slots =
slotsPerJob.get(jobId);
+ if (taskSlot.isEmpty()) {
+ // remove the allocation id to task slot mapping
+ allocatedSlots.remove(allocationId);
- if (slots == null) {
- throw new IllegalStateException("There
are no more slots allocated for the job " + jobId +
- ". This indicates a programming
bug.");
- }
+ // unregister a potential timeout
+ timerService.unregisterTimeout(allocationId);
- slots.remove(allocationId);
+ Set<AllocationID> slots = slotsPerJob.get(jobId);
- if (slots.isEmpty()) {
- slotsPerJob.remove(jobId);
- }
+ if (slots == null) {
+ throw new IllegalStateException("There are no
more slots allocated for the job " + jobId +
+ ". This indicates a programming bug.");
+ }
- taskSlot.close();
- taskSlots.remove(taskSlot.getIndex());
-
budgetManager.release(taskSlot.getResourceProfile());
+ slots.remove(allocationId);
- return taskSlot.getIndex();
- } else {
- // we couldn't free the task slot because it
still contains task, fail the tasks
- // and set the slot state to releasing so that
it gets eventually freed
- taskSlot.markReleasing();
+ if (slots.isEmpty()) {
+ slotsPerJob.remove(jobId);
+ }
- Iterator<Task> taskIterator =
taskSlot.getTasks();
+ taskSlots.remove(taskSlot.getIndex());
+ budgetManager.release(taskSlot.getResourceProfile());
+ taskSlot.close();
Review comment:
Any special reason moving `taskSlot.close()` after `taskSlots.remove()` and
`budgetManager.release`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services