xintongsong commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r920959736
##########
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##########
@@ -261,6 +261,14 @@ public class ResourceManagerOptions {
+
TaskManagerOptions.REGISTRATION_TIMEOUT.key()
+ "'.");
+ /** Timeout for ResourceManager to recover all the previous attempts
workers. */
+ public static final ConfigOption<Duration>
RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT =
+
ConfigOptions.key("resourcemanager.previous-worker.recovery.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(5))
+ .withDescription(
+ "Timeout for resource manager to recover all the
previous attempts workers.");
Review Comment:
I think the description can be improved to make it more helpful for common
users. E.g., what happens when the timeout is reached, how should the users
tune this config.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -102,6 +103,12 @@
*/
private CompletableFuture<Void> startWorkerCoolDown;
+ /** The future indicates whether the rm is ready to serve. */
+ private final CompletableFuture<Acknowledge> recoveryFuture;
Review Comment:
```suggestion
private final CompletableFuture<Void> recoveryFuture;
```
##########
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##########
@@ -261,6 +261,14 @@ public class ResourceManagerOptions {
+
TaskManagerOptions.REGISTRATION_TIMEOUT.key()
+ "'.");
+ /** Timeout for ResourceManager to recover all the previous attempts
workers. */
+ public static final ConfigOption<Duration>
RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT =
+
ConfigOptions.key("resourcemanager.previous-worker.recovery.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(5))
Review Comment:
I think the default value should be `0`, to align with the previous
behaviors.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -326,6 +350,7 @@ private void scheduleWorkerRegistrationTimeoutCheck(final
ResourceID resourceId)
"Worker {} did not register in {}, will stop
it and request a new one if needed.",
resourceId,
workerRegistrationTimeout);
+
tryRemovePreviousPendingRecoveryTaskManager(resourceId, "Register timeout");
Review Comment:
We should not need this, because it is covered by `internalStopWorker`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void
onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
"Worker {} recovered from previous attempt.",
resourceId.getStringWithMetadata());
}
+ if (recoveredWorkers.size() > 0) {
+ scheduleRunAsync(
+ () -> {
+ if (!recoveryFuture.isDone()) {
Review Comment:
No need to check for `recoveryFuture.isDone()`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -418,6 +443,30 @@ private void tryResetWorkerCreationCoolDown() {
}
}
+ @Override
+ public CompletableFuture<Acknowledge> getRecoveryFuture() {
+ return recoveryFuture;
+ }
+
+ private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID
resourceID, String reason) {
+ long sizeBeforeRemove = previousAttemptUnregisteredWorkers.size();
+ boolean exist = previousAttemptUnregisteredWorkers.remove(resourceID);
+ if (exist) {
+ if (!recoveryFuture.isDone()) {
Review Comment:
No need to check for `exist` and `recoveryFuture.isDone()`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -531,9 +531,12 @@ public CompletableFuture<Acknowledge>
declareRequiredResources(
if (null != jobManagerRegistration) {
if (Objects.equals(jobMasterId,
jobManagerRegistration.getJobMasterId())) {
- slotManager.processResourceRequirements(resourceRequirements);
-
- return CompletableFuture.completedFuture(Acknowledge.get());
+ return getRecoveryFuture()
+ .thenApply(
+ acknowledge -> {
+
slotManager.processResourceRequirements(resourceRequirements);
Review Comment:
```suggestion
validateRunsInMainThread();
slotManager.processResourceRequirements(resourceRequirements);
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -1222,6 +1225,14 @@ protected abstract void internalDeregisterApplication(
*/
public abstract boolean stopWorker(WorkerType worker);
+ /**
+ * Get the recovery future of the resource manager.
+ *
+ * @return The recovery future of the resource manager, which indicated
whether it is ready to
+ * serve.
+ */
+ protected abstract CompletableFuture<Acknowledge> getRecoveryFuture();
Review Comment:
I'd suggest to name this something like `getReadyToServeFuture()`, which
directly describes its semantics.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void
onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
"Worker {} recovered from previous attempt.",
resourceId.getStringWithMetadata());
}
+ if (recoveredWorkers.size() > 0) {
Review Comment:
```suggestion
if (recoveredWorkers.size() > 0 &&
!previousWorkerRecoverTimeout.isZero()) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]