xintongsong commented on code in PR #21496:
URL: https://github.com/apache/flink/pull/21496#discussion_r1048173833
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java:
##########
@@ -68,6 +68,31 @@ void addTaskManager(
*/
Map<JobID, ResourceCounter> removePendingTaskManager(PendingTaskManagerId
pendingTaskManagerId);
+ /**
+ * Add an unwanted task manager.
+ *
+ * @param instanceId identifier of task manager.
+ * @param cause the reason of mark this task manager as unwanted.
+ */
+ void addUnWantedTaskManager(InstanceID instanceId, Exception cause);
+
+ /**
+ * Add an unwanted task manager with resource profile.
+ *
+ * @param instanceId identifier of task manager.
+ * @param cause the reason of mark this task manager as unwanted.
+ * @param totalResourceProfile the total resource profile of task manager.
+ * @param defaultSlotResourceProfile the default resource profile of task
manager.
+ */
+ void addUnWantedTaskManagerWithResource(
+ InstanceID instanceId,
+ Exception cause,
+ ResourceProfile totalResourceProfile,
+ ResourceProfile defaultSlotResourceProfile);
Review Comment:
Why do we need 2 methods for this interface?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -206,15 +220,66 @@ protected Optional<WorkerType>
getWorkerNodeIfAcceptRegistration(ResourceID reso
return Optional.ofNullable(workerNodeMap.get(resourceID));
}
+ @VisibleForTesting
+ public void declareResourceNeeded(Collection<ResourceDeclaration>
resourceDeclarations) {
+ for (ResourceDeclaration resourceDeclaration : resourceDeclarations) {
+ WorkerResourceSpec workerResourceSpec =
resourceDeclaration.getSpec();
+ int declaredWorkerNumber = resourceDeclaration.getNumNeeded();
+
+ final int releaseOrRequestWorkerNumber =
+ allocatedWorkerCounter.getNum(workerResourceSpec)
+ + pendingRequestCounter.getNum(workerResourceSpec)
+ - declaredWorkerNumber;
+
+ if (releaseOrRequestWorkerNumber > 0) {
+ log.debug(
+ "need release {} workers, current pending requested
worker number {}, allocated worker number {}, declared worker number {}",
+ releaseOrRequestWorkerNumber,
+ pendingRequestCounter.getNum(workerResourceSpec),
+ allocatedWorkerCounter.getNum(workerResourceSpec),
+ declaredWorkerNumber);
+
+ // release unwanted workers.
+ int remainingReleasingWorkerNumber =
+ releaseUnWantedResources(
+ resourceDeclaration.getUnwanted(),
releaseOrRequestWorkerNumber);
+
+ // TODO, release pending/starting/running workers to exceed
declared worker number.
+ if (remainingReleasingWorkerNumber > 0) {
+ log.debug(
+ "need release {} workers after release unwanted
workers.",
+ remainingReleasingWorkerNumber);
+ }
+ } else if (releaseOrRequestWorkerNumber < 0) {
+ int requestWorkerNumber = -releaseOrRequestWorkerNumber;
+ log.debug(
+ "need request {} new workers, current pending
requested worker number {}, allocated worker number {}, declared worker number
{}",
+ requestWorkerNumber,
+ pendingRequestCounter.getNum(workerResourceSpec),
+ allocatedWorkerCounter.getNum(workerResourceSpec),
+ declaredWorkerNumber);
+ for (int i = 0; i < requestWorkerNumber; i++) {
+ requestNewWorker(workerResourceSpec);
+ }
+ } else {
+ log.debug(
+ "current pending worker {} and allocated worker {}
meets the declared worker {}",
+ pendingRequestCounter.getNum(workerResourceSpec),
+ allocatedWorkerCounter.getNum(workerResourceSpec),
+ declaredWorkerNumber);
+ }
+ }
+ }
+
@Override
protected void onWorkerRegistered(WorkerType worker) {
final ResourceID resourceId = worker.getResourceID();
log.info("Worker {} is registered.",
resourceId.getStringWithMetadata());
- final WorkerResourceSpec workerResourceSpec =
- currentAttemptUnregisteredWorkers.remove(resourceId);
tryRemovePreviousPendingRecoveryTaskManager(resourceId);
- if (workerResourceSpec != null) {
+ if (currentAttemptUnregisteredWorkers.remove(resourceId)) {
+ final WorkerResourceSpec workerResourceSpec =
+ checkNotNull(currentAttemptWorkerSpec.get(resourceId));
Review Comment:
How do we deal with the recovered workers? For a recovered worker, its
`WorkerResourceSpec` is unknown until registration.
I think the current implementation will lead to inconsistency between slot
manager and active resource manager, slot manager knows the resource of the
recovered and registered workers while active resource manager doesn't.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceDeclaration.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+
+import java.util.Collection;
+
+/** ResourceDeclaration for {@link ResourceAllocator}. */
+public class ResourceDeclaration {
+ private final WorkerResourceSpec spec;
+ private final int numNeeded;
+ private final Collection<UnwantedWorker> unwanted;
Review Comment:
We should explain the definition of `unwanted` in JavaDocs.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/UnwantedWorkerWithResourceProfile.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+/** Unwanted workers with specific resource profile. */
+public class UnwantedWorkerWithResourceProfile {
+ private final UnwantedWorker unwantedWorker;
+ private final ResourceProfile totalResourceProfile;
+ private final ResourceProfile defaultSlotResourceProfile;
+ private final int numSlots;
Review Comment:
Moreover, we may consider make this an inner class of `TaskManagerTracker`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/UnwantedWorkerWithResourceProfile.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+/** Unwanted workers with specific resource profile. */
+public class UnwantedWorkerWithResourceProfile {
+ private final UnwantedWorker unwantedWorker;
+ private final ResourceProfile totalResourceProfile;
+ private final ResourceProfile defaultSlotResourceProfile;
+ private final int numSlots;
Review Comment:
It seems these are only used for generating `WorkerResourceSpec`. Then why
not just store the `WorkerResourceSpec`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -89,8 +91,17 @@
/** Number of requested and not registered workers per worker resource
spec. */
private final WorkerCounter pendingWorkerCounter;
- /** Identifiers and worker resource spec of requested not registered
workers. */
- private final Map<ResourceID, WorkerResourceSpec>
currentAttemptUnregisteredWorkers;
+ /** Number of requested but not allocated workers per worker resource
spec. */
+ private final WorkerCounter pendingRequestCounter;
Review Comment:
Could you explain how is `pendingRequestCounter` different from
`pendingWorkerCounter`, and why do we need it?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/UnwantedWorker.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Objects;
+
+/** Unwanted workers of {@link SlotManager}. */
+public class UnwantedWorker {
+ private final InstanceID id;
+ private final Exception exception;
Review Comment:
I'm not sure about adding an exception for each unwanted worker in the
declaration. It seems assuming the unwanted workers will be closed with the
given exception, which is against the idea that the unwanted workers should
only be a hint.
Currently, there are only two possible reasons for the exception here:
reaching max resource limitation, and TM idle timeout. I think for these two,
we can simply close the TM connection with a reason something like "slot
manager has determined that the resource is no longer needed", and log the
reason why such determination is made on the slot manager side.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -384,6 +396,82 @@ &&
isMaxTotalResourceExceededAfterAdding(totalResourceProfile)) {
}
}
+ private void declareNeededResourcesWithDelay() {
+ if (declareNeededResourceDelay.toMillis() <= 0) {
Review Comment:
Might be better to check that allocator is supported. IIUC, that's expected
whenever this method is called.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]