xintongsong commented on code in PR #22305:
URL: https://github.com/apache/flink/pull/22305#discussion_r1181534754
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -110,9 +121,93 @@ public ResourceAllocationResult tryFulfillRequirements(
jobId, unfulfilledJobRequirements, pendingResources,
resultBuilder);
}
}
+
+ // Unlike tryFulfillRequirementsForJobWithPendingResources, which
updates pendingResources
+ // to the latest state after a new PendingTaskManager is created,
+ // tryFulFillRedundantResources will not update pendingResources even
after new
+ // PendingTaskManagers are created.
+ // This is because the pendingResources are no longer needed
afterwards.
+ tryFulFillRedundantResources(
+ totalResourceProfile.multiply(redundantTaskManagerNum),
+ registeredResources,
+ pendingResources,
+ resultBuilder);
+
return resultBuilder.build();
}
+ @Override
+ public ResourceReleaseResult tryReleaseUnusedResources(
+ TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+ ResourceProfile requiredRedundantResources =
+ totalResourceProfile.multiply(redundantTaskManagerNum);
+ ResourceReleaseResult.Builder builder =
ResourceReleaseResult.builder();
+
+ List<TaskManagerInfo> taskManagersIdleTimeout = new ArrayList<>();
+ List<TaskManagerInfo> taskManagersNonTimeout = new ArrayList<>();
+ long currentTime = System.currentTimeMillis();
+ taskManagerResourceInfoProvider
+ .getRegisteredTaskManagers()
+ .forEach(
+ taskManagerInfo -> {
+ if (taskManagerInfo.isIdle()
+ && currentTime -
taskManagerInfo.getIdleSince()
+ >=
taskManagerTimeout.toMilliseconds()) {
+ taskManagersIdleTimeout.add(taskManagerInfo);
+ } else {
+ taskManagersNonTimeout.add(taskManagerInfo);
+ }
+ });
+
+ List<PendingTaskManager> pendingTaskManagersNonUse = new ArrayList<>();
+ List<PendingTaskManager> pendingTaskManagersInuse = new ArrayList<>();
+ taskManagerResourceInfoProvider
+ .getPendingTaskManagers()
+ .forEach(
+ pendingTaskManager -> {
+ if
(pendingTaskManager.getPendingSlotAllocationRecords().isEmpty()) {
+
pendingTaskManagersNonUse.add(pendingTaskManager);
+ } else {
+
pendingTaskManagersInuse.add(pendingTaskManager);
+ }
+ });
+
+ if (taskManagersIdleTimeout.isEmpty() &&
pendingTaskManagersNonUse.isEmpty()) {
+ // short-cut for nothing to release
+ return builder.build();
+ }
+
+ ResourceProfile resourcesToKeep = ResourceProfile.ZERO;
+
+ // check whether available resources of used (pending) task manager is
enough.
+ ResourceProfile availableResourcesOfNonIdle =
+ getAvailableResourceOfTaskManagers(taskManagersNonTimeout);
+ resourcesToKeep = resourcesToKeep.merge(availableResourcesOfNonIdle);
+ if (!canFulfillRequirement(requiredRedundantResources,
resourcesToKeep)) {
+ ResourceProfile availableResourcesOfNonIdlePendingTaskManager =
+
getAvailableResourceOfPendingTaskManagers(pendingTaskManagersInuse);
+ resourcesToKeep =
resourcesToKeep.merge(availableResourcesOfNonIdlePendingTaskManager);
+ }
+
+ // try reserve or release unused (pending) task managers
+ for (TaskManagerInfo taskManagerInfo : taskManagersIdleTimeout) {
+ if (canFulfillRequirement(requiredRedundantResources,
resourcesToKeep)) {
Review Comment:
Minor: we can use a local variable to skip unnecessary
`canFulfillRequirement` calls. `if (redundantFulfilled ||
canFulfillRequirement(xxx, xxx))`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,17 @@ ResourceAllocationResult tryFulfillRequirements(
Map<JobID, Collection<ResourceRequirement>> missingResources,
TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+ /**
+ * Try to make a release decision to release useless PendingTaskManagers
and TaskManagers. This
Review Comment:
```suggestion
* Try to make a release decision to release unused PendingTaskManagers
and TaskManagers. This
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java:
##########
@@ -135,19 +135,25 @@ class TaskExecutorManager implements AutoCloseable {
this.unWantedWorkers = new HashSet<>();
this.resourceAllocator = Preconditions.checkNotNull(resourceAllocator);
this.mainThreadExecutor = mainThreadExecutor;
- taskManagerTimeoutsAndRedundancyCheck =
- scheduledExecutor.scheduleWithFixedDelay(
- () ->
- mainThreadExecutor.execute(
-
this::checkTaskManagerTimeoutsAndRedundancy),
- 0L,
- taskManagerTimeout.toMilliseconds(),
- TimeUnit.MILLISECONDS);
+ if (resourceAllocator.isSupported()) {
+ taskManagerTimeoutsAndRedundancyCheck =
+ scheduledExecutor.scheduleWithFixedDelay(
+ () ->
+ mainThreadExecutor.execute(
+
this::checkTaskManagerTimeoutsAndRedundancy),
+ 0L,
+ taskManagerTimeout.toMilliseconds(),
+ TimeUnit.MILLISECONDS);
+ } else {
+ taskManagerTimeoutsAndRedundancyCheck = null;
Review Comment:
Let's mark `taskManagerTimeoutsAndRedundancyCheck` as `@Nullable`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,17 @@ ResourceAllocationResult tryFulfillRequirements(
Map<JobID, Collection<ResourceRequirement>> missingResources,
TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+ /**
+ * Try to make a release decision to release useless PendingTaskManagers
and TaskManagers. This
Review Comment:
And there seems to be a few more occurrences of `useless`. Please replace
all of them with `unused` or `unwanted`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]