huwh commented on code in PR #21496:
URL: https://github.com/apache/flink/pull/21496#discussion_r1055370113


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -286,17 +314,119 @@ public void onError(Throwable exception) {
         onFatalError(exception);
     }
 
+    @VisibleForTesting
+    public int releaseUnWantedResources(
+            Collection<InstanceID> unwantedWorkers, int 
needReleaseWorkerNumber) {
+
+        Exception cause =
+                new FlinkExpectedException(
+                        "slot manager has determined that the resource is no 
longer needed");
+        for (InstanceID unwantedWorker : unwantedWorkers) {
+            if (needReleaseWorkerNumber <= 0) {
+                break;
+            }
+            if (releaseResource(unwantedWorker, cause)) {
+                needReleaseWorkerNumber--;
+            }
+        }
+        return needReleaseWorkerNumber;
+    }
+
     // ------------------------------------------------------------------------
     //  Internal
     // ------------------------------------------------------------------------
 
-    private void releaseResource(InstanceID instanceId, Exception cause) {
+    private void updateResourceDeclaration(ResourceDeclaration 
newResourceDeclaration) {
+        WorkerResourceSpec workerResourceSpec = 
newResourceDeclaration.getSpec();
+        ResourceDeclaration oldDeclaration = 
this.resourceDeclarations.get(workerResourceSpec);
+
+        Set<InstanceID> filteredUnwantedWorkers = new HashSet<>();
+        if (oldDeclaration != null) {
+            oldDeclaration
+                    .getUnwantedWorkers()
+                    .forEach(
+                            instanceID -> {
+                                if (getWorkerByInstanceId(instanceID) != null) 
{
+                                    filteredUnwantedWorkers.add(instanceID);
+                                }
+                            });
+        }
+
+        newResourceDeclaration
+                .getUnwantedWorkers()
+                .forEach(
+                        instanceID -> {
+                            if (getWorkerByInstanceId(instanceID) != null) {
+                                filteredUnwantedWorkers.add(instanceID);
+                            }
+                        });
+
+        this.resourceDeclarations.put(
+                workerResourceSpec,
+                new ResourceDeclaration(
+                        workerResourceSpec,
+                        newResourceDeclaration.getNumNeeded(),
+                        filteredUnwantedWorkers));
+    }
+
+    private void checkResourceDeclarations() {
+        for (ResourceDeclaration resourceDeclaration : 
resourceDeclarations.values()) {
+            WorkerResourceSpec workerResourceSpec = 
resourceDeclaration.getSpec();
+            int declaredWorkerNumber = resourceDeclaration.getNumNeeded();
+
+            final int releaseOrRequestWorkerNumber =
+                    totalWorkerCounter.getNum(workerResourceSpec) - 
declaredWorkerNumber;
+
+            if (releaseOrRequestWorkerNumber > 0) {
+                log.debug(
+                        "need release {} workers, current worker number {}, 
declared worker number {}",
+                        releaseOrRequestWorkerNumber,
+                        totalWorkerCounter.getNum(workerResourceSpec),
+                        declaredWorkerNumber);
+
+                // release unwanted workers.
+                int remainingReleasingWorkerNumber =
+                        releaseUnWantedResources(
+                                resourceDeclaration.getUnwantedWorkers(),
+                                releaseOrRequestWorkerNumber);
+
+                // TODO, release pending/starting/running workers to exceed 
declared worker number.

Review Comment:
   Yes, I would like to keep the order of released workers as: unwanted worker, 
pending requested, allocated but not registered, registered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to