xintongsong commented on a change in pull request #10682:
[FLINK-15247][Runtime] Wait for all slots to be free before task executor
services shutdown upon stopping
URL: https://github.com/apache/flink/pull/10682#discussion_r361274776
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
##########
@@ -403,54 +410,60 @@ public int freeSlot(AllocationID allocationId, Throwable
cause) throws SlotNotFo
TaskSlot taskSlot = getTaskSlot(allocationId);
if (taskSlot != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Free slot {}.", taskSlot, cause);
- } else {
- LOG.info("Free slot {}.", taskSlot);
- }
+ return freeSlot(taskSlot, cause);
+ } else {
+ throw new SlotNotFoundException(allocationId);
+ }
+ }
- final JobID jobId = taskSlot.getJobId();
+ private int freeSlot(TaskSlot taskSlot, Throwable cause) {
+ AllocationID allocationId = taskSlot.getAllocationId();
- if (taskSlot.isEmpty()) {
- // remove the allocation id to task slot mapping
- allocatedSlots.remove(allocationId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Free slot {}.", taskSlot, cause);
+ } else {
+ LOG.info("Free slot {}.", taskSlot);
+ }
- // unregister a potential timeout
- timerService.unregisterTimeout(allocationId);
+ final JobID jobId = taskSlot.getJobId();
- Set<AllocationID> slots =
slotsPerJob.get(jobId);
+ if (taskSlot.isEmpty()) {
+ // remove the allocation id to task slot mapping
+ allocatedSlots.remove(allocationId);
- if (slots == null) {
- throw new IllegalStateException("There
are no more slots allocated for the job " + jobId +
- ". This indicates a programming
bug.");
- }
+ // unregister a potential timeout
+ timerService.unregisterTimeout(allocationId);
- slots.remove(allocationId);
+ Set<AllocationID> slots = slotsPerJob.get(jobId);
- if (slots.isEmpty()) {
- slotsPerJob.remove(jobId);
- }
+ if (slots == null) {
+ throw new IllegalStateException("There are no
more slots allocated for the job " + jobId +
+ ". This indicates a programming bug.");
+ }
- taskSlot.close();
- taskSlots.remove(taskSlot.getIndex());
-
budgetManager.release(taskSlot.getResourceProfile());
+ slots.remove(allocationId);
- return taskSlot.getIndex();
- } else {
- // we couldn't free the task slot because it
still contains task, fail the tasks
- // and set the slot state to releasing so that
it gets eventually freed
- taskSlot.markReleasing();
+ if (slots.isEmpty()) {
+ slotsPerJob.remove(jobId);
+ }
- Iterator<Task> taskIterator =
taskSlot.getTasks();
+ taskSlots.remove(taskSlot.getIndex());
+ budgetManager.release(taskSlot.getResourceProfile());
+ taskSlot.close();
- while (taskIterator.hasNext()) {
-
taskIterator.next().failExternally(cause);
- }
+ return taskSlot.getIndex();
+ } else {
+ // we couldn't free the task slot because it still
contains task, fail the tasks
+ // and set the slot state to releasing so that it gets
eventually freed
+ taskSlot.markReleasing();
+
+ Iterator<Task> taskIterator = taskSlot.getTasks();
- return -1;
+ while (taskIterator.hasNext()) {
+ taskIterator.next().failExternally(cause);
}
- } else {
- throw new SlotNotFoundException(allocationId);
+
+ return -1;
}
}
Review comment:
Minor: This seems to be a irrelevant refactoring to me, which would be
better in a separated hotfix commit.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services