huwh commented on code in PR #21496:
URL: https://github.com/apache/flink/pull/21496#discussion_r1055369152
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -286,17 +314,119 @@ public void onError(Throwable exception) {
onFatalError(exception);
}
+ @VisibleForTesting
+ public int releaseUnWantedResources(
+ Collection<InstanceID> unwantedWorkers, int
needReleaseWorkerNumber) {
+
+ Exception cause =
+ new FlinkExpectedException(
+ "slot manager has determined that the resource is no
longer needed");
+ for (InstanceID unwantedWorker : unwantedWorkers) {
+ if (needReleaseWorkerNumber <= 0) {
+ break;
+ }
+ if (releaseResource(unwantedWorker, cause)) {
+ needReleaseWorkerNumber--;
+ }
+ }
+ return needReleaseWorkerNumber;
+ }
+
// ------------------------------------------------------------------------
// Internal
// ------------------------------------------------------------------------
- private void releaseResource(InstanceID instanceId, Exception cause) {
+ private void updateResourceDeclaration(ResourceDeclaration
newResourceDeclaration) {
+ WorkerResourceSpec workerResourceSpec =
newResourceDeclaration.getSpec();
+ ResourceDeclaration oldDeclaration =
this.resourceDeclarations.get(workerResourceSpec);
+
+ Set<InstanceID> filteredUnwantedWorkers = new HashSet<>();
+ if (oldDeclaration != null) {
+ oldDeclaration
+ .getUnwantedWorkers()
+ .forEach(
+ instanceID -> {
+ if (getWorkerByInstanceId(instanceID) != null)
{
+ filteredUnwantedWorkers.add(instanceID);
+ }
+ });
+ }
+
+ newResourceDeclaration
+ .getUnwantedWorkers()
+ .forEach(
+ instanceID -> {
+ if (getWorkerByInstanceId(instanceID) != null) {
+ filteredUnwantedWorkers.add(instanceID);
+ }
+ });
+
+ this.resourceDeclarations.put(
+ workerResourceSpec,
+ new ResourceDeclaration(
+ workerResourceSpec,
+ newResourceDeclaration.getNumNeeded(),
+ filteredUnwantedWorkers));
+ }
Review Comment:
addressed, ResourceDeclaration are unmodifiable and will only be replaced
entirety.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]