xintongsong commented on code in PR #21496:
URL: https://github.com/apache/flink/pull/21496#discussion_r1057033790
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -290,13 +321,79 @@ public void onError(Throwable exception) {
// Internal
// ------------------------------------------------------------------------
- private void releaseResource(InstanceID instanceId, Exception cause) {
+ @VisibleForTesting
+ public int releaseUnWantedResources(
+ Collection<InstanceID> unwantedWorkers, int
needReleaseWorkerNumber) {
+
+ Exception cause =
+ new FlinkExpectedException(
+ "slot manager has determined that the resource is no
longer needed");
+ for (InstanceID unwantedWorker : unwantedWorkers) {
+ if (needReleaseWorkerNumber <= 0) {
+ break;
+ }
+ if (releaseResource(unwantedWorker, cause)) {
+ needReleaseWorkerNumber--;
+ }
+ }
+ return needReleaseWorkerNumber;
+ }
+
+ private void checkResourceDeclarations() {
Review Comment:
Minor: might be better to exchange `checkResourceDeclarations` and
`releaseUnWantedResources`. Usually, when method B is called inside method A, B
goes below A.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java:
##########
@@ -41,12 +43,15 @@ public interface ResourceAllocator {
*
* @param resourceID identifying which resource to release
*/
- void releaseResource(ResourceID resourceID);
+ void cleaningUpDisconnectedResource(ResourceID resourceID);
Review Comment:
Minor: this renaming change is probably suitable for a separate commit.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java:
##########
@@ -48,21 +41,12 @@ public boolean isSupported() {
}
@Override
- public void releaseResource(InstanceID instanceId, Exception cause) {
- releaseResourceConsumer.accept(instanceId, cause);
- }
-
public void cleaningUpDisconnectedResource(ResourceID resourceID) {
throw new UnsupportedOperationException();
}
- @Override
- public void allocateResource(WorkerResourceSpec workerResourceSpec) {
- allocateResourceConsumer.accept(workerResourceSpec);
- }
-
@Override
public void declareResourceNeeded(Collection<ResourceDeclaration>
resourceDeclarations) {
- //
+ declareResourceNeededConsumer.accept(resourceDeclarations);
Review Comment:
Minor: `declareResourceNeededConsumer` should be introduced in the previous
commit, together with the introducing of `declareResourceNeeded()`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java:
##########
@@ -116,8 +116,8 @@ void start(
* @param initialSlotReport for the new task manager
* @param totalResourceProfile for the new task manager
* @param defaultSlotResourceProfile for the new task manager
- * @return True if the task manager has not been registered before and is
registered
- * successfully; otherwise false
+ * @return True if the task manager has been registered before or is
registered successfully;
+ * otherwise false
Review Comment:
The new semantic is against intuition and hard to understand. Moreover, it
leads to duplicated calls of `ResourceManager#onWorkerRegistered`.
Alternatively, we can introduce a enum type for the return value. E.g.,
```
enum RegistrationResult {
SUCCESS, // task manager has not been registered before and is
registered successfully
IGNORED, // task manager has been registered before and is ignored
REJECTED, // task manager is rejected and should be disconnected
}
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -136,6 +142,251 @@ public void testStartNewWorker() throws Exception {
};
}
+ /** Tests unwanted worker released. */
+ @Test
+ public void testReleaseUnWantedResources() throws Exception {
+ new Context() {
+ {
+ final ResourceID tmResourceId = ResourceID.generate();
+ final CompletableFuture<ResourceID> releaseResourceFuture =
+ new CompletableFuture<>();
+
+ driverBuilder
+ .setRequestResourceFunction(
+ taskExecutorProcessSpec ->
+
CompletableFuture.completedFuture(tmResourceId))
+
.setReleaseResourceConsumer(releaseResourceFuture::complete);
+
+ runTest(
+ () -> {
+ // request new worker
+ runInMainThread(
+ () ->
+ getResourceManager()
+
.requestNewWorker(WORKER_RESOURCE_SPEC));
+
+ CompletableFuture<RegistrationResponse>
registerTaskExecutorFuture =
+ registerTaskExecutor(tmResourceId);
+ assertThat(
+
registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
+
instanceOf(RegistrationResponse.Success.class));
+
+ InstanceID instanceID =
+ getResourceManager()
+
.getInstanceIdByResourceId(tmResourceId)
+ .get();
+
+ runInMainThread(
+ () ->
+ getResourceManager()
+ .sendSlotReport(
+
tmResourceId,
+ instanceID,
+ new
SlotReport(
+
new SlotStatus(
+
new SlotID(
+
tmResourceId,
+
0),
+
ResourceProfile
+
.ANY)),
+
TIMEOUT_TIME))
+ .get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+ // release unknown unwanted registered worker.
+ CompletableFuture<Integer> releaseFuture =
+ runInMainThread(
+ () ->
+ getResourceManager()
+
.releaseUnWantedResources(
Review Comment:
`releaseUnwantedResources` is an internal method of RM, rather than part of
its protocol. We should not call it directly, because it may lead to
inconsistency and unpredictable behaviors.
Alternatively, we can use `getResourceManager().declareResourceNeeded()` in
the tests.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java:
##########
@@ -301,6 +307,45 @@ public Map<WorkerResourceSpec, Integer>
getRequiredWorkers() {
: Collections.emptyMap();
}
+ private Collection<ResourceDeclaration> getResourceDeclaration() {
+ final int pendingWorkerNum =
+ MathUtils.divideRoundUp(getNumberPendingTaskManagerSlots(),
numSlotsPerWorker);
+ Set<InstanceID> neededRegisteredWorkers = new
HashSet<>(taskManagerRegistrations.keySet());
+ neededRegisteredWorkers.removeAll(unWantedWorkers);
+ final int totalWorkerNum = pendingWorkerNum +
neededRegisteredWorkers.size();
+
+ return Collections.singleton(
+ new ResourceDeclaration(
+ defaultWorkerResourceSpec, totalWorkerNum, new
HashSet<>(unWantedWorkers)));
+ }
+
+ private void declareNeededResourcesWithDelay() {
Review Comment:
The `xxxWithDelay` pattern has been repeated 4 times
(declarative/fine-grained slot manager, check resource requirement / declare
needed resources), and there could be more in future. I wonder if it can be
deduplicated as a util. Not necessary for this PR though. This PR is already
quite complicated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]