xintongsong commented on code in PR #21496:
URL: https://github.com/apache/flink/pull/21496#discussion_r1057033790


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -290,13 +321,79 @@ public void onError(Throwable exception) {
     //  Internal
     // ------------------------------------------------------------------------
 
-    private void releaseResource(InstanceID instanceId, Exception cause) {
+    @VisibleForTesting
+    public int releaseUnWantedResources(
+            Collection<InstanceID> unwantedWorkers, int 
needReleaseWorkerNumber) {
+
+        Exception cause =
+                new FlinkExpectedException(
+                        "slot manager has determined that the resource is no 
longer needed");
+        for (InstanceID unwantedWorker : unwantedWorkers) {
+            if (needReleaseWorkerNumber <= 0) {
+                break;
+            }
+            if (releaseResource(unwantedWorker, cause)) {
+                needReleaseWorkerNumber--;
+            }
+        }
+        return needReleaseWorkerNumber;
+    }
+
+    private void checkResourceDeclarations() {

Review Comment:
   Minor: might be better to exchange `checkResourceDeclarations` and 
`releaseUnWantedResources`. Usually, when method B is called inside method A, B 
goes below A.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java:
##########
@@ -41,12 +43,15 @@ public interface ResourceAllocator {
      *
      * @param resourceID identifying which resource to release
      */
-    void releaseResource(ResourceID resourceID);
+    void cleaningUpDisconnectedResource(ResourceID resourceID);

Review Comment:
   Minor: this renaming change is probably suitable for a separate commit.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java:
##########
@@ -48,21 +41,12 @@ public boolean isSupported() {
     }
 
     @Override
-    public void releaseResource(InstanceID instanceId, Exception cause) {
-        releaseResourceConsumer.accept(instanceId, cause);
-    }
-
     public void cleaningUpDisconnectedResource(ResourceID resourceID) {
         throw new UnsupportedOperationException();
     }
 
-    @Override
-    public void allocateResource(WorkerResourceSpec workerResourceSpec) {
-        allocateResourceConsumer.accept(workerResourceSpec);
-    }
-
     @Override
     public void declareResourceNeeded(Collection<ResourceDeclaration> 
resourceDeclarations) {
-        //
+        declareResourceNeededConsumer.accept(resourceDeclarations);

Review Comment:
   Minor: `declareResourceNeededConsumer` should be introduced in the previous 
commit, together with the introducing of `declareResourceNeeded()`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java:
##########
@@ -116,8 +116,8 @@ void start(
      * @param initialSlotReport for the new task manager
      * @param totalResourceProfile for the new task manager
      * @param defaultSlotResourceProfile for the new task manager
-     * @return True if the task manager has not been registered before and is 
registered
-     *     successfully; otherwise false
+     * @return True if the task manager has been registered before or is 
registered successfully;
+     *     otherwise false

Review Comment:
   The new semantic is against intuition and hard to understand. Moreover, it 
leads to duplicated calls of `ResourceManager#onWorkerRegistered`.
   
   Alternatively, we can introduce a enum type for the return value. E.g.,
   ```
   enum RegistrationResult {
       SUCCESS, // task manager has not been registered before and is 
registered successfully
       IGNORED, // task manager has been registered before and is ignored
       REJECTED, // task manager is rejected and should be disconnected
   }
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -136,6 +142,251 @@ public void testStartNewWorker() throws Exception {
         };
     }
 
+    /** Tests unwanted worker released. */
+    @Test
+    public void testReleaseUnWantedResources() throws Exception {
+        new Context() {
+            {
+                final ResourceID tmResourceId = ResourceID.generate();
+                final CompletableFuture<ResourceID> releaseResourceFuture =
+                        new CompletableFuture<>();
+
+                driverBuilder
+                        .setRequestResourceFunction(
+                                taskExecutorProcessSpec ->
+                                        
CompletableFuture.completedFuture(tmResourceId))
+                        
.setReleaseResourceConsumer(releaseResourceFuture::complete);
+
+                runTest(
+                        () -> {
+                            // request new worker
+                            runInMainThread(
+                                    () ->
+                                            getResourceManager()
+                                                    
.requestNewWorker(WORKER_RESOURCE_SPEC));
+
+                            CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture =
+                                    registerTaskExecutor(tmResourceId);
+                            assertThat(
+                                    
registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
+                                    
instanceOf(RegistrationResponse.Success.class));
+
+                            InstanceID instanceID =
+                                    getResourceManager()
+                                            
.getInstanceIdByResourceId(tmResourceId)
+                                            .get();
+
+                            runInMainThread(
+                                            () ->
+                                                    getResourceManager()
+                                                            .sendSlotReport(
+                                                                    
tmResourceId,
+                                                                    instanceID,
+                                                                    new 
SlotReport(
+                                                                            
new SlotStatus(
+                                                                               
     new SlotID(
+                                                                               
             tmResourceId,
+                                                                               
             0),
+                                                                               
     ResourceProfile
+                                                                               
             .ANY)),
+                                                                    
TIMEOUT_TIME))
+                                    .get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                            // release unknown unwanted registered worker.
+                            CompletableFuture<Integer> releaseFuture =
+                                    runInMainThread(
+                                            () ->
+                                                    getResourceManager()
+                                                            
.releaseUnWantedResources(

Review Comment:
   `releaseUnwantedResources` is an internal method of RM, rather than part of 
its protocol. We should not call it directly, because it may lead to 
inconsistency and unpredictable behaviors.
   
   Alternatively, we can use `getResourceManager().declareResourceNeeded()` in 
the tests.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java:
##########
@@ -301,6 +307,45 @@ public Map<WorkerResourceSpec, Integer> 
getRequiredWorkers() {
                 : Collections.emptyMap();
     }
 
+    private Collection<ResourceDeclaration> getResourceDeclaration() {
+        final int pendingWorkerNum =
+                MathUtils.divideRoundUp(getNumberPendingTaskManagerSlots(), 
numSlotsPerWorker);
+        Set<InstanceID> neededRegisteredWorkers = new 
HashSet<>(taskManagerRegistrations.keySet());
+        neededRegisteredWorkers.removeAll(unWantedWorkers);
+        final int totalWorkerNum = pendingWorkerNum + 
neededRegisteredWorkers.size();
+
+        return Collections.singleton(
+                new ResourceDeclaration(
+                        defaultWorkerResourceSpec, totalWorkerNum, new 
HashSet<>(unWantedWorkers)));
+    }
+
+    private void declareNeededResourcesWithDelay() {

Review Comment:
   The `xxxWithDelay` pattern has been repeated 4 times 
(declarative/fine-grained slot manager, check resource requirement / declare 
needed resources), and there could be more in future. I wonder if it can be 
deduplicated as a util. Not necessary for this PR though. This PR is already 
quite complicated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to