Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170935501
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
---
@@ -104,175 +91,97 @@ public static void beforeClass() {
SavepointSerializers.setFailWhenLegacyStateDetected(false);
}
- @BeforeClass
- public static void setupCluster() throws Exception {
- final Configuration configuration = new Configuration();
-
- FiniteDuration timeout = new FiniteDuration(30L,
TimeUnit.SECONDS);
-
- actorSystem = AkkaUtils.createLocalActorSystem(new
Configuration());
-
- highAvailabilityServices =
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
- configuration,
- TestingUtils.defaultExecutor());
-
- Tuple2<ActorRef, ActorRef> master =
JobManager.startJobManagerActors(
- configuration,
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- Option.apply("jm"),
- Option.apply("arch"),
- TestingJobManager.class,
- TestingMemoryArchivist.class);
-
- jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
-
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- actorSystem,
- timeout);
-
- archiver = new AkkaActorGateway(master._2(),
jobManager.leaderSessionID());
-
- Configuration tmConfig = new Configuration();
-
tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-
- ActorRef taskManagerRef =
TaskManager.startTaskManagerComponentsAndActor(
- tmConfig,
- ResourceID.generate(),
- actorSystem,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.apply("tm"),
- true,
- TestingTaskManager.class);
-
- taskManager = new AkkaActorGateway(taskManagerRef,
jobManager.leaderSessionID());
-
- // Wait until connected
- Object msg = new
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
- Await.ready(taskManager.ask(msg, timeout), timeout);
- }
-
- @AfterClass
- public static void tearDownCluster() throws Exception {
- if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
- }
-
- if (actorSystem != null) {
- actorSystem.shutdown();
- }
-
- if (archiver != null) {
- archiver.actor().tell(PoisonPill.getInstance(),
ActorRef.noSender());
- }
-
- if (jobManager != null) {
- jobManager.actor().tell(PoisonPill.getInstance(),
ActorRef.noSender());
- }
-
- if (taskManager != null) {
- taskManager.actor().tell(PoisonPill.getInstance(),
ActorRef.noSender());
- }
- }
-
@Test
public void testMigrationAndRestore() throws Throwable {
+ ClassLoader classLoader = this.getClass().getClassLoader();
+ ClusterClient<?> clusterClient =
miniClusterResource.getClusterClient();
+ clusterClient.setDetached(true);
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
// submit job with old version savepoint and create a migrated
savepoint in the new version
- String savepointPath = migrateJob();
+ String savepointPath = migrateJob(classLoader, clusterClient,
deadline);
// restore from migrated new version savepoint
- restoreJob(savepointPath);
+ restoreJob(classLoader, clusterClient, deadline, savepointPath);
}
- private String migrateJob() throws Throwable {
+ private String migrateJob(ClassLoader classLoader, ClusterClient<?>
clusterClient, Deadline deadline) throws Throwable {
+
URL savepointResource =
AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/"
+ getMigrationSavepointName());
if (savepointResource == null) {
throw new IllegalArgumentException("Savepoint file does
not exist.");
}
JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
- Object msg;
- Object result;
+ assertNotNull(jobToMigrate.getJobID());
- // Submit job graph
- msg = new JobManagerMessages.SubmitJob(jobToMigrate,
ListeningBehaviour.DETACHED);
- result = Await.result(jobManager.ask(msg, timeout), timeout);
+ clusterClient.submitJob(jobToMigrate, classLoader);
- if (result instanceof JobManagerMessages.JobResultFailure) {
- JobManagerMessages.JobResultFailure failure =
(JobManagerMessages.JobResultFailure) result;
- throw new Exception(failure.cause());
- }
- Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class,
result.getClass());
+ CompletableFuture<JobStatus> jobStatusFuture =
FutureUtils.retrySuccesfulWithDelay(
+ () ->
clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
+ TestingUtils.defaultScheduledExecutor());
- // Wait for all tasks to be running
- msg = new
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
- Await.result(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.RUNNING,
jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
// Trigger savepoint
File targetDirectory = tmpFolder.newFolder();
- msg = new
JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(),
targetDirectory.getAbsolutePath());
+ String savepointPath = null;
// FLINK-6918: Retry cancel with savepoint message in case that
StreamTasks were not running
// TODO: The retry logic should be removed once the StreamTask
lifecycle has been fixed (see FLINK-4714)
- boolean retry = true;
- for (int i = 0; retry && i < 10; i++) {
- Future<Object> future = jobManager.ask(msg, timeout);
- result = Await.result(future, timeout);
-
- if (result instanceof
JobManagerMessages.CancellationFailure) {
- Thread.sleep(50L);
- } else {
- retry = false;
+ while (deadline.hasTimeLeft() && savepointPath == null) {
+ try {
+ savepointPath =
clusterClient.cancelWithSavepoint(
+ jobToMigrate.getJobID(),
+ targetDirectory.getAbsolutePath());
+ } catch (Exception e) {
+ if (!e.toString().matches(".* savepoint for the
job .* failed.*")) {
+ throw e;
+ }
}
}
- if (result instanceof JobManagerMessages.CancellationFailure) {
- JobManagerMessages.CancellationFailure failure =
(JobManagerMessages.CancellationFailure) result;
- throw new Exception(failure.cause());
- }
+ assertNotNull(savepointPath);
- String savepointPath =
((JobManagerMessages.CancellationSuccess) result).savepointPath();
+ jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ () ->
clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
+ TestingUtils.defaultScheduledExecutor());
- // Wait until canceled
- msg = new
TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(),
JobStatus.CANCELED);
- Await.ready(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.CANCELED,
jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
return savepointPath;
}
- private void restoreJob(String savepointPath) throws Exception {
+ private void restoreJob(ClassLoader classLoader, ClusterClient<?>
clusterClient, Deadline deadline, String savepointPath) throws Exception {
JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath,
allowNonRestoredState));
- Object msg;
- Object result;
+ assertNotNull(jobToRestore.getJobID());
--- End diff --
well...if we go down that route you'd have to check every return value. For
example, why aren't you checking that `createJobGraph` doesn't return null?
---