xintongsong commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r920959736


##########
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##########
@@ -261,6 +261,14 @@ public class ResourceManagerOptions {
                                     + 
TaskManagerOptions.REGISTRATION_TIMEOUT.key()
                                     + "'.");
 
+    /** Timeout for ResourceManager to recover all the previous attempts 
workers. */
+    public static final ConfigOption<Duration> 
RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT =
+            
ConfigOptions.key("resourcemanager.previous-worker.recovery.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(5))
+                    .withDescription(
+                            "Timeout for resource manager to recover all the 
previous attempts workers.");

Review Comment:
   I think the description can be improved to make it more helpful for common 
users. E.g., what happens when the timeout is reached, how should the users 
tune this config.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -102,6 +103,12 @@
      */
     private CompletableFuture<Void> startWorkerCoolDown;
 
+    /** The future indicates whether the rm is ready to serve. */
+    private final CompletableFuture<Acknowledge> recoveryFuture;

Review Comment:
   ```suggestion
       private final CompletableFuture<Void> recoveryFuture;
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##########
@@ -261,6 +261,14 @@ public class ResourceManagerOptions {
                                     + 
TaskManagerOptions.REGISTRATION_TIMEOUT.key()
                                     + "'.");
 
+    /** Timeout for ResourceManager to recover all the previous attempts 
workers. */
+    public static final ConfigOption<Duration> 
RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT =
+            
ConfigOptions.key("resourcemanager.previous-worker.recovery.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(5))

Review Comment:
   I think the default value should be `0`, to align with the previous 
behaviors.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -326,6 +350,7 @@ private void scheduleWorkerRegistrationTimeoutCheck(final 
ResourceID resourceId)
                                 "Worker {} did not register in {}, will stop 
it and request a new one if needed.",
                                 resourceId,
                                 workerRegistrationTimeout);
+                        
tryRemovePreviousPendingRecoveryTaskManager(resourceId, "Register timeout");

Review Comment:
   We should not need this, because it is covered by `internalStopWorker`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void 
onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
                     "Worker {} recovered from previous attempt.",
                     resourceId.getStringWithMetadata());
         }
+        if (recoveredWorkers.size() > 0) {
+            scheduleRunAsync(
+                    () -> {
+                        if (!recoveryFuture.isDone()) {

Review Comment:
   No need to check for `recoveryFuture.isDone()`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -418,6 +443,30 @@ private void tryResetWorkerCreationCoolDown() {
         }
     }
 
+    @Override
+    public CompletableFuture<Acknowledge> getRecoveryFuture() {
+        return recoveryFuture;
+    }
+
+    private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID 
resourceID, String reason) {
+        long sizeBeforeRemove = previousAttemptUnregisteredWorkers.size();
+        boolean exist = previousAttemptUnregisteredWorkers.remove(resourceID);
+        if (exist) {
+            if (!recoveryFuture.isDone()) {

Review Comment:
   No need to check for `exist` and `recoveryFuture.isDone()`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -531,9 +531,12 @@ public CompletableFuture<Acknowledge> 
declareRequiredResources(
 
         if (null != jobManagerRegistration) {
             if (Objects.equals(jobMasterId, 
jobManagerRegistration.getJobMasterId())) {
-                slotManager.processResourceRequirements(resourceRequirements);
-
-                return CompletableFuture.completedFuture(Acknowledge.get());
+                return getRecoveryFuture()
+                        .thenApply(
+                                acknowledge -> {
+                                    
slotManager.processResourceRequirements(resourceRequirements);

Review Comment:
   ```suggestion
                                       validateRunsInMainThread();
                                       
slotManager.processResourceRequirements(resourceRequirements);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -1222,6 +1225,14 @@ protected abstract void internalDeregisterApplication(
      */
     public abstract boolean stopWorker(WorkerType worker);
 
+    /**
+     * Get the recovery future of the resource manager.
+     *
+     * @return The recovery future of the resource manager, which indicated 
whether it is ready to
+     *     serve.
+     */
+    protected abstract CompletableFuture<Acknowledge> getRecoveryFuture();

Review Comment:
   I'd suggest to name this something like `getReadyToServeFuture()`, which 
directly describes its semantics.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void 
onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
                     "Worker {} recovered from previous attempt.",
                     resourceId.getStringWithMetadata());
         }
+        if (recoveredWorkers.size() > 0) {

Review Comment:
   ```suggestion
           if (recoveredWorkers.size() > 0 && 
!previousWorkerRecoverTimeout.isZero()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to