This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2b9b9859253 [FLINK-24713][Runtime/Coordination] Postpone
resourceManager serving after the recovery phase has finished
2b9b9859253 is described below
commit 2b9b9859253698c3c90ca420f10975e27e6c52d4
Author: Aitozi <[email protected]>
AuthorDate: Mon May 23 12:02:18 2022 +0800
[FLINK-24713][Runtime/Coordination] Postpone resourceManager serving after
the recovery phase has finished
This closes #20256
---
.../generated/resource_manager_configuration.html | 6 ++
.../configuration/ResourceManagerOptions.java | 11 +++
.../runtime/resourcemanager/ResourceManager.java | 18 ++++-
.../resourcemanager/StandaloneResourceManager.java | 6 ++
.../active/ActiveResourceManager.java | 46 +++++++++++-
.../active/ActiveResourceManagerFactory.java | 5 ++
.../resourcemanager/ResourceManagerTest.java | 85 ++++++++++++++++++++--
.../resourcemanager/TestingResourceManager.java | 18 ++++-
.../TestingResourceManagerFactory.java | 5 ++
.../active/ActiveResourceManagerTest.java | 77 +++++++++++++++++++-
10 files changed, 262 insertions(+), 15 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/resource_manager_configuration.html
b/docs/layouts/shortcodes/generated/resource_manager_configuration.html
index 502daf01f72..05296b36e1c 100644
--- a/docs/layouts/shortcodes/generated/resource_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/resource_manager_configuration.html
@@ -14,6 +14,12 @@
<td>String</td>
<td>Timeout for jobs which don't have a job manager as leader
assigned.</td>
</tr>
+ <tr>
+ <td><h5>resourcemanager.previous-worker.recovery.timeout</h5></td>
+ <td style="word-wrap: break-word;">0 ms</td>
+ <td>Duration</td>
+ <td>Timeout for resource manager to recover all the previous
attempts workers. If exceeded, resource manager will handle new resource
requests by requesting new workers. If you would like to reuse the previous
workers as much as possible, you should configure a longer timeout time to wait
for previous workers to register.</td>
+ </tr>
<tr>
<td><h5>resourcemanager.rpc.port</h5></td>
<td style="word-wrap: break-word;">0</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index 6f8c9ccdb94..af68f9ef21a 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -261,6 +261,17 @@ public class ResourceManagerOptions {
+
TaskManagerOptions.REGISTRATION_TIMEOUT.key()
+ "'.");
+ /** Timeout for ResourceManager to recover all the previous attempts
workers. */
+ public static final ConfigOption<Duration>
RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT =
+
ConfigOptions.key("resourcemanager.previous-worker.recovery.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(0))
+ .withDescription(
+ "Timeout for resource manager to recover all the
previous attempts workers. If exceeded,"
+ + " resource manager will handle new
resource requests by requesting new workers."
+ + " If you would like to reuse the
previous workers as much as possible, you should"
+ + " configure a longer timeout time to
wait for previous workers to register.");
+
//
---------------------------------------------------------------------------------------------
/** Not intended to be instantiated. */
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c5109c2d2d4..9a9a17b4cbc 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -547,9 +547,13 @@ public abstract class ResourceManager<WorkerType extends
ResourceIDRetrievable>
if (null != jobManagerRegistration) {
if (Objects.equals(jobMasterId,
jobManagerRegistration.getJobMasterId())) {
- slotManager.processResourceRequirements(resourceRequirements);
-
- return CompletableFuture.completedFuture(Acknowledge.get());
+ return getReadyToServeFuture()
+ .thenApply(
+ acknowledge -> {
+ validateRunsInMainThread();
+
slotManager.processResourceRequirements(resourceRequirements);
+ return null;
+ });
} else {
return FutureUtils.completedExceptionally(
new ResourceManagerException(
@@ -1252,6 +1256,14 @@ public abstract class ResourceManager<WorkerType extends
ResourceIDRetrievable>
*/
public abstract boolean stopWorker(WorkerType worker);
+ /**
+ * Get the ready to serve future of the resource manager.
+ *
+ * @return The ready to serve future of the resource manager, which
indicated whether it is
+ * ready to serve.
+ */
+ protected abstract CompletableFuture<Void> getReadyToServeFuture();
+
/**
* Set {@link SlotManager} whether to fail unfulfillable slot requests.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 15f350fd50b..f41f5bb4d79 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -36,6 +36,7 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -126,4 +127,9 @@ public class StandaloneResourceManager extends
ResourceManager<ResourceID> {
TimeUnit.MILLISECONDS);
}
}
+
+ @Override
+ public CompletableFuture<Void> getReadyToServeFuture() {
+ return CompletableFuture.completedFuture(null);
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
index 408634bfab1..e5cb5315d63 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
@@ -103,6 +103,12 @@ public class ActiveResourceManager<WorkerType extends
ResourceIDRetrievable>
*/
private CompletableFuture<Void> startWorkerCoolDown;
+ /** The future indicates whether the rm is ready to serve. */
+ private final CompletableFuture<Void> readyToServeFuture;
+
+ /** Timeout to wait for all the previous attempts workers to recover. */
+ private final Duration previousWorkerRecoverTimeout;
+
public ActiveResourceManager(
ResourceManagerDriver<WorkerType> resourceManagerDriver,
Configuration flinkConfig,
@@ -121,6 +127,7 @@ public class ActiveResourceManager<WorkerType extends
ResourceIDRetrievable>
ThresholdMeter startWorkerFailureRater,
Duration retryInterval,
Duration workerRegistrationTimeout,
+ Duration previousWorkerRecoverTimeout,
Executor ioExecutor) {
super(
rpcService,
@@ -150,6 +157,8 @@ public class ActiveResourceManager<WorkerType extends
ResourceIDRetrievable>
this.startWorkerRetryInterval = retryInterval;
this.workerRegistrationTimeout = workerRegistrationTimeout;
this.startWorkerCoolDown = FutureUtils.completedVoidFuture();
+ this.previousWorkerRecoverTimeout = previousWorkerRecoverTimeout;
+ this.readyToServeFuture = new CompletableFuture<>();
}
// ------------------------------------------------------------------------
@@ -213,7 +222,7 @@ public class ActiveResourceManager<WorkerType extends
ResourceIDRetrievable>
final WorkerResourceSpec workerResourceSpec =
currentAttemptUnregisteredWorkers.remove(resourceId);
- previousAttemptUnregisteredWorkers.remove(resourceId);
+ tryRemovePreviousPendingRecoveryTaskManager(resourceId);
if (workerResourceSpec != null) {
final int count =
pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
log.info(
@@ -251,6 +260,18 @@ public class ActiveResourceManager<WorkerType extends
ResourceIDRetrievable>
"Worker {} recovered from previous attempt.",
resourceId.getStringWithMetadata());
}
+ if (recoveredWorkers.size() > 0 &&
!previousWorkerRecoverTimeout.isZero()) {
+ scheduleRunAsync(
+ () -> {
+ readyToServeFuture.complete(null);
+ log.info(
+ "Timeout to wait recovery taskmanagers,
recovery future is completed.");
+ },
+ previousWorkerRecoverTimeout.toMillis(),
+ TimeUnit.MILLISECONDS);
+ } else {
+ readyToServeFuture.complete(null);
+ }
}
@Override
@@ -367,7 +388,7 @@ public class ActiveResourceManager<WorkerType extends
ResourceIDRetrievable>
WorkerResourceSpec workerResourceSpec =
currentAttemptUnregisteredWorkers.remove(resourceId);
- previousAttemptUnregisteredWorkers.remove(resourceId);
+ tryRemovePreviousPendingRecoveryTaskManager(resourceId);
if (workerResourceSpec != null) {
final int count =
pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
log.info(
@@ -425,6 +446,27 @@ public class ActiveResourceManager<WorkerType extends
ResourceIDRetrievable>
}
}
+ @Override
+ public CompletableFuture<Void> getReadyToServeFuture() {
+ return readyToServeFuture;
+ }
+
+ private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID
resourceID) {
+ long sizeBeforeRemove = previousAttemptUnregisteredWorkers.size();
+ if (previousAttemptUnregisteredWorkers.remove(resourceID)) {
+ log.info(
+ "Pending recovery taskmanagers {} -> {}.{}",
+ sizeBeforeRemove,
+ previousAttemptUnregisteredWorkers.size(),
+ previousAttemptUnregisteredWorkers.size() == 0
+ ? " Resource manager is ready to serve."
+ : "");
+ }
+ if (previousAttemptUnregisteredWorkers.size() == 0) {
+ readyToServeFuture.complete(null);
+ }
+ }
+
/** Always execute on the current main thread executor. */
private class GatewayMainThreadExecutor implements ScheduledExecutor {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
index 94a378404b8..dd39914c6b0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
@@ -109,6 +109,10 @@ public abstract class
ActiveResourceManagerFactory<WorkerType extends ResourceID
configuration.get(ResourceManagerOptions.START_WORKER_RETRY_INTERVAL);
final Duration workerRegistrationTimeout =
configuration.get(ResourceManagerOptions.TASK_MANAGER_REGISTRATION_TIMEOUT);
+ final Duration previousWorkerRecoverTimeout =
+ configuration.get(
+
ResourceManagerOptions.RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT);
+
return new ActiveResourceManager<>(
createResourceManagerDriver(
configuration, webInterfaceUrl,
rpcService.getAddress()),
@@ -128,6 +132,7 @@ public abstract class
ActiveResourceManagerFactory<WorkerType extends ResourceID
failureRater,
retryInterval,
workerRegistrationTimeout,
+ previousWorkerRecoverTimeout,
ioExecutor);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index b69ca94168e..eba7a62bd5b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -300,6 +300,69 @@ class ResourceManagerTest {
assertThat(clearRequirementsFuture.get(5,
TimeUnit.SECONDS)).isEqualTo(jobId);
}
+ @Test
+ void testProcessResourceRequirementsWhenRecoveryFinished() throws
Exception {
+ final TestingJobMasterGateway jobMasterGateway =
+ new TestingJobMasterGatewayBuilder()
+ .setAddress(UUID.randomUUID().toString())
+ .build();
+ rpcService.registerGateway(jobMasterGateway.getAddress(),
jobMasterGateway);
+
+ final JobLeaderIdService jobLeaderIdService =
+ TestingJobLeaderIdService.newBuilder()
+ .setGetLeaderIdFunction(
+ jobId ->
+ CompletableFuture.completedFuture(
+
jobMasterGateway.getFencingToken()))
+ .build();
+
+ final CompletableFuture<Void> processRequirementsFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> readyToServeFuture = new
CompletableFuture<>();
+
+ final SlotManager slotManager =
+ new TestingSlotManagerBuilder()
+ .setProcessRequirementsConsumer(
+ r -> processRequirementsFuture.complete(null))
+ .createSlotManager();
+ resourceManager =
+ new ResourceManagerBuilder()
+ .withJobLeaderIdService(jobLeaderIdService)
+ .withSlotManager(slotManager)
+ .withReadyToServeFuture(readyToServeFuture)
+ .buildAndStart();
+
+ final JobID jobId = JobID.generate();
+ final ResourceManagerGateway resourceManagerGateway =
+ resourceManager.getSelfGateway(ResourceManagerGateway.class);
+ resourceManagerGateway
+ .registerJobMaster(
+ jobMasterGateway.getFencingToken(),
+ ResourceID.generate(),
+ jobMasterGateway.getAddress(),
+ jobId,
+ TIMEOUT)
+ .get();
+
+ resourceManagerGateway.declareRequiredResources(
+ jobMasterGateway.getFencingToken(),
+ ResourceRequirements.create(
+ jobId,
+ jobMasterGateway.getAddress(),
+ Collections.singleton(
+
ResourceRequirement.create(ResourceProfile.UNKNOWN, 1))),
+ TIMEOUT);
+ resourceManager
+ .runInMainThread(
+ () -> {
+
assertThat(processRequirementsFuture.isDone()).isFalse();
+ readyToServeFuture.complete(null);
+
assertThat(processRequirementsFuture.isDone()).isTrue();
+ return null;
+ },
+ TIMEOUT)
+ .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+
@Test
void testHeartbeatTimeoutWithJobMaster() throws Exception {
final CompletableFuture<ResourceID> heartbeatRequestFuture = new
CompletableFuture<>();
@@ -427,12 +490,11 @@ class ResourceManagerTest {
stopWorkerFuture.complete(worker);
return true;
}),
- resourceManagerGateway -> {
- registerTaskExecutor(
- resourceManagerGateway,
- taskExecutorId,
- taskExecutorGateway.getAddress());
- },
+ resourceManagerGateway ->
+ registerTaskExecutor(
+ resourceManagerGateway,
+ taskExecutorId,
+ taskExecutorGateway.getAddress()),
resourceManagerResourceId -> {
// might have been completed or not depending whether the
timeout was triggered
// first
@@ -727,6 +789,8 @@ class ResourceManagerTest {
private BlocklistHandler.Factory blocklistHandlerFactory =
new NoOpBlocklistHandler.Factory();
private Function<ResourceID, Boolean> stopWorkerFunction = null;
+ private CompletableFuture<Void> readyToServeFuture =
+ CompletableFuture.completedFuture(null);
private ResourceManagerBuilder withHeartbeatServices(HeartbeatServices
heartbeatServices) {
this.heartbeatServices = heartbeatServices;
@@ -756,6 +820,12 @@ class ResourceManagerTest {
return this;
}
+ public ResourceManagerBuilder withReadyToServeFuture(
+ CompletableFuture<Void> readyToServeFuture) {
+ this.readyToServeFuture = readyToServeFuture;
+ return this;
+ }
+
private TestingResourceManager buildAndStart() throws Exception {
if (heartbeatServices == null) {
heartbeatServices = ResourceManagerTest.heartbeatServices;
@@ -793,7 +863,8 @@ class ResourceManagerTest {
jobLeaderIdService,
testingFatalErrorHandler,
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
- stopWorkerFunction);
+ stopWorkerFunction,
+ readyToServeFuture);
resourceManager.start();
resourceManager.getStartedFuture().get(TIMEOUT.getSize(),
TIMEOUT.getUnit());
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index e0947dd3fc7..f73d4df3c0a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.resourcemanager;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -31,10 +32,13 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
+import org.apache.flink.util.TimeUtils;
import javax.annotation.Nullable;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
@@ -42,6 +46,7 @@ import java.util.function.Function;
public class TestingResourceManager extends ResourceManager<ResourceID> {
private final Function<ResourceID, Boolean> stopWorkerFunction;
+ private final CompletableFuture<Void> readyToServeFuture;
public TestingResourceManager(
RpcService rpcService,
@@ -55,7 +60,8 @@ public class TestingResourceManager extends
ResourceManager<ResourceID> {
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler,
ResourceManagerMetricGroup resourceManagerMetricGroup,
- Function<ResourceID, Boolean> stopWorkerFunction) {
+ Function<ResourceID, Boolean> stopWorkerFunction,
+ CompletableFuture<Void> readyToServeFuture) {
super(
rpcService,
leaderSessionId,
@@ -73,6 +79,7 @@ public class TestingResourceManager extends
ResourceManager<ResourceID> {
ForkJoinPool.commonPool());
this.stopWorkerFunction = stopWorkerFunction;
+ this.readyToServeFuture = readyToServeFuture;
}
@Override
@@ -106,4 +113,13 @@ public class TestingResourceManager extends
ResourceManager<ResourceID> {
public boolean stopWorker(ResourceID worker) {
return stopWorkerFunction.apply(worker);
}
+
+ @Override
+ public CompletableFuture<Void> getReadyToServeFuture() {
+ return readyToServeFuture;
+ }
+
+ public <T> CompletableFuture<T> runInMainThread(Callable<T> callable, Time
timeout) {
+ return callAsync(callable, TimeUtils.toDuration(timeout));
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
index 0646b0b8475..181bc8c9618 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
@@ -238,5 +238,10 @@ public class TestingResourceManagerFactory extends
ResourceManagerFactory<Resour
return getTerminationFutureFunction.apply(
MockResourceManager.this, super.getTerminationFuture());
}
+
+ @Override
+ public CompletableFuture<Void> getReadyToServeFuture() {
+ return CompletableFuture.completedFuture(null);
+ }
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
index 04faad11182..764147d405b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
@@ -47,6 +47,8 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
import org.junit.ClassRule;
import org.junit.Test;
@@ -67,6 +69,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
@@ -841,6 +844,71 @@ public class ActiveResourceManagerTest extends TestLogger {
};
}
+ @Test
+ public void testResourceManagerRecoveredAfterAllTMRegistered() throws
Exception {
+ new Context() {
+ {
+ final ResourceID tmResourceId1 = ResourceID.generate();
+ final ResourceID tmResourceId2 = ResourceID.generate();
+
+ runTest(
+ () -> {
+ // workers recovered
+ runInMainThread(
+ () ->
+ getResourceManager()
+
.onPreviousAttemptWorkersRecovered(
+ ImmutableSet.of(
+
tmResourceId1, tmResourceId2)));
+
+ runInMainThread(
+ () ->
getResourceManager().onWorkerRegistered(tmResourceId1));
+ runInMainThread(
+ () ->
getResourceManager().onWorkerRegistered(tmResourceId2));
+ runInMainThread(
+ () ->
+ assertTrue(
+
getResourceManager()
+
.getReadyToServeFuture()
+ .isDone()))
+ .get(TIMEOUT_SEC, TimeUnit.SECONDS);
+ });
+ }
+ };
+ }
+
+ @Test
+ public void testResourceManagerRecoveredAfterReconcileTimeout() throws
Exception {
+ new Context() {
+ {
+ final ResourceID tmResourceId1 = ResourceID.generate();
+ final ResourceID tmResourceId2 = ResourceID.generate();
+
+ flinkConfig.set(
+
ResourceManagerOptions.RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT,
+ Duration.ofMillis(TESTING_START_WORKER_TIMEOUT_MS));
+
+ runTest(
+ () -> {
+ // workers recovered
+ runInMainThread(
+ () -> {
+ getResourceManager()
+
.onPreviousAttemptWorkersRecovered(
+ ImmutableSet.of(
+ tmResourceId1,
tmResourceId2));
+ });
+
+ runInMainThread(
+ () ->
getResourceManager().onWorkerRegistered(tmResourceId1));
+ getResourceManager()
+ .getReadyToServeFuture()
+ .get(TIMEOUT_SEC, TimeUnit.SECONDS);
+ });
+ }
+ };
+ }
+
private static class Context {
final Configuration flinkConfig = new Configuration();
@@ -886,6 +954,10 @@ public class ActiveResourceManagerTest extends TestLogger {
configuration.get(ResourceManagerOptions.START_WORKER_RETRY_INTERVAL);
final Duration workerRegistrationTimeout =
configuration.get(ResourceManagerOptions.TASK_MANAGER_REGISTRATION_TIMEOUT);
+ final Duration previousWorkerRecoverTimeout =
+ configuration.get(
+ ResourceManagerOptions
+
.RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT);
final ActiveResourceManager<ResourceID> activeResourceManager =
new ActiveResourceManager<>(
@@ -907,6 +979,7 @@ public class ActiveResourceManagerTest extends TestLogger {
configuration),
retryInterval,
workerRegistrationTimeout,
+ previousWorkerRecoverTimeout,
ForkJoinPool.commonPool());
activeResourceManager.start();
@@ -917,8 +990,8 @@ public class ActiveResourceManagerTest extends TestLogger {
return activeResourceManager;
}
- void runInMainThread(Runnable runnable) {
- resourceManager.runInMainThread(
+ CompletableFuture<Void> runInMainThread(Runnable runnable) {
+ return resourceManager.runInMainThread(
() -> {
runnable.run();
return null;