xintongsong commented on code in PR #21496:
URL: https://github.com/apache/flink/pull/21496#discussion_r1048173833


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java:
##########
@@ -68,6 +68,31 @@ void addTaskManager(
      */
     Map<JobID, ResourceCounter> removePendingTaskManager(PendingTaskManagerId 
pendingTaskManagerId);
 
+    /**
+     * Add an unwanted task manager.
+     *
+     * @param instanceId identifier of task manager.
+     * @param cause the reason of mark this task manager as unwanted.
+     */
+    void addUnWantedTaskManager(InstanceID instanceId, Exception cause);
+
+    /**
+     * Add an unwanted task manager with resource profile.
+     *
+     * @param instanceId identifier of task manager.
+     * @param cause the reason of mark this task manager as unwanted.
+     * @param totalResourceProfile the total resource profile of task manager.
+     * @param defaultSlotResourceProfile the default resource profile of task 
manager.
+     */
+    void addUnWantedTaskManagerWithResource(
+            InstanceID instanceId,
+            Exception cause,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);

Review Comment:
   Why do we need 2 methods for this interface?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -206,15 +220,66 @@ protected Optional<WorkerType> 
getWorkerNodeIfAcceptRegistration(ResourceID reso
         return Optional.ofNullable(workerNodeMap.get(resourceID));
     }
 
+    @VisibleForTesting
+    public void declareResourceNeeded(Collection<ResourceDeclaration> 
resourceDeclarations) {
+        for (ResourceDeclaration resourceDeclaration : resourceDeclarations) {
+            WorkerResourceSpec workerResourceSpec = 
resourceDeclaration.getSpec();
+            int declaredWorkerNumber = resourceDeclaration.getNumNeeded();
+
+            final int releaseOrRequestWorkerNumber =
+                    allocatedWorkerCounter.getNum(workerResourceSpec)
+                            + pendingRequestCounter.getNum(workerResourceSpec)
+                            - declaredWorkerNumber;
+
+            if (releaseOrRequestWorkerNumber > 0) {
+                log.debug(
+                        "need release {} workers, current pending requested 
worker number {}, allocated worker number {}, declared worker number {}",
+                        releaseOrRequestWorkerNumber,
+                        pendingRequestCounter.getNum(workerResourceSpec),
+                        allocatedWorkerCounter.getNum(workerResourceSpec),
+                        declaredWorkerNumber);
+
+                // release unwanted workers.
+                int remainingReleasingWorkerNumber =
+                        releaseUnWantedResources(
+                                resourceDeclaration.getUnwanted(), 
releaseOrRequestWorkerNumber);
+
+                // TODO, release pending/starting/running workers to exceed 
declared worker number.
+                if (remainingReleasingWorkerNumber > 0) {
+                    log.debug(
+                            "need release {} workers after release unwanted 
workers.",
+                            remainingReleasingWorkerNumber);
+                }
+            } else if (releaseOrRequestWorkerNumber < 0) {
+                int requestWorkerNumber = -releaseOrRequestWorkerNumber;
+                log.debug(
+                        "need request {} new workers, current pending 
requested worker number {}, allocated worker number {}, declared worker number 
{}",
+                        requestWorkerNumber,
+                        pendingRequestCounter.getNum(workerResourceSpec),
+                        allocatedWorkerCounter.getNum(workerResourceSpec),
+                        declaredWorkerNumber);
+                for (int i = 0; i < requestWorkerNumber; i++) {
+                    requestNewWorker(workerResourceSpec);
+                }
+            } else {
+                log.debug(
+                        "current pending worker {} and allocated worker {} 
meets the declared worker {}",
+                        pendingRequestCounter.getNum(workerResourceSpec),
+                        allocatedWorkerCounter.getNum(workerResourceSpec),
+                        declaredWorkerNumber);
+            }
+        }
+    }
+
     @Override
     protected void onWorkerRegistered(WorkerType worker) {
         final ResourceID resourceId = worker.getResourceID();
         log.info("Worker {} is registered.", 
resourceId.getStringWithMetadata());
 
-        final WorkerResourceSpec workerResourceSpec =
-                currentAttemptUnregisteredWorkers.remove(resourceId);
         tryRemovePreviousPendingRecoveryTaskManager(resourceId);
-        if (workerResourceSpec != null) {
+        if (currentAttemptUnregisteredWorkers.remove(resourceId)) {
+            final WorkerResourceSpec workerResourceSpec =
+                    checkNotNull(currentAttemptWorkerSpec.get(resourceId));

Review Comment:
   How do we deal with the recovered workers? For a recovered worker, its 
`WorkerResourceSpec` is unknown until registration.
   
   I think the current implementation will lead to inconsistency between slot 
manager and active resource manager, slot manager knows the resource of the 
recovered and registered workers while active resource manager doesn't.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceDeclaration.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+
+import java.util.Collection;
+
+/** ResourceDeclaration for {@link ResourceAllocator}. */
+public class ResourceDeclaration {
+    private final WorkerResourceSpec spec;
+    private final int numNeeded;
+    private final Collection<UnwantedWorker> unwanted;

Review Comment:
   We should explain the definition of `unwanted` in JavaDocs.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/UnwantedWorkerWithResourceProfile.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+/** Unwanted workers with specific resource profile. */
+public class UnwantedWorkerWithResourceProfile {
+    private final UnwantedWorker unwantedWorker;
+    private final ResourceProfile totalResourceProfile;
+    private final ResourceProfile defaultSlotResourceProfile;
+    private final int numSlots;

Review Comment:
   Moreover, we may consider make this an inner class of `TaskManagerTracker`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/UnwantedWorkerWithResourceProfile.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+/** Unwanted workers with specific resource profile. */
+public class UnwantedWorkerWithResourceProfile {
+    private final UnwantedWorker unwantedWorker;
+    private final ResourceProfile totalResourceProfile;
+    private final ResourceProfile defaultSlotResourceProfile;
+    private final int numSlots;

Review Comment:
   It seems these are only used for generating `WorkerResourceSpec`. Then why 
not just store the `WorkerResourceSpec`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -89,8 +91,17 @@
     /** Number of requested and not registered workers per worker resource 
spec. */
     private final WorkerCounter pendingWorkerCounter;
 
-    /** Identifiers and worker resource spec of requested not registered 
workers. */
-    private final Map<ResourceID, WorkerResourceSpec> 
currentAttemptUnregisteredWorkers;
+    /** Number of requested but not allocated workers per worker resource 
spec. */
+    private final WorkerCounter pendingRequestCounter;

Review Comment:
   Could you explain how is `pendingRequestCounter` different from 
`pendingWorkerCounter`, and why do we need it?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/UnwantedWorker.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Objects;
+
+/** Unwanted workers of {@link SlotManager}. */
+public class UnwantedWorker {
+    private final InstanceID id;
+    private final Exception exception;

Review Comment:
   I'm not sure about adding an exception for each unwanted worker in the 
declaration. It seems assuming the unwanted workers will be closed with the 
given exception, which is against the idea that the unwanted workers should 
only be a hint.
   
   Currently, there are only two possible reasons for the exception here: 
reaching max resource limitation, and TM idle timeout. I think for these two, 
we can simply close the TM connection with a reason something like "slot 
manager has determined that the resource is no longer needed", and log the 
reason why such determination is made on the slot manager side.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -384,6 +396,82 @@ && 
isMaxTotalResourceExceededAfterAdding(totalResourceProfile)) {
         }
     }
 
+    private void declareNeededResourcesWithDelay() {
+        if (declareNeededResourceDelay.toMillis() <= 0) {

Review Comment:
   Might be better to check that allocator is supported. IIUC, that's expected 
whenever this method is called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to