Thesharing commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r845090835


##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+        finishJob(testingJobManagerRunner);
+
+        // Before the archiving is finished, the cleanup is not finished and 
the job is not
+        // terminated.
+        assertThatNoCleanupWasTriggered();
+        final CompletableFuture<Void> jobTerminationFuture =
+                dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L));
+        assertFalse(jobTerminationFuture.isDone());
+
+        archiveFuture.complete(null);
+
+        // Once the archive is finished, the cleanup is finished and the job 
is terminated.
+        assertGlobalCleanupTriggered(jobId);
+        jobTerminationFuture.join();
+
+        assertTrue(archivist.isArchived());
+    }
+
+    @Test
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+
+        final long archiveTimeout = 10L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+
+        finishJob(testingJobManagerRunner);
+
+        // Once the archiving is timed out, the cleanup will be finished and 
the job will be
+        // terminated.
+        assertGlobalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();

Review Comment:
   I'm not sure about this one. `Dispatcher.getJobTerminationFuture` has 
already been annotated with `@VisibleForTesting`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java:
##########
@@ -129,10 +129,12 @@ public MiniDispatcher(
     }
 
     @Override
-    protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo 
executionGraphInfo) {
+    protected CompletableFuture<CleanupJobState> jobReachedTerminalState(
+            ExecutionGraphInfo executionGraphInfo) {
         final ArchivedExecutionGraph archivedExecutionGraph =
                 executionGraphInfo.getArchivedExecutionGraph();
-        final CleanupJobState cleanupHAState = 
super.jobReachedTerminalState(executionGraphInfo);
+        final CompletableFuture<CleanupJobState> cleanupHAState =

Review Comment:
   > ... the conclusion that, if a user configures the HistoryServer, he/she 
might have the desire to have the result archived. In that case, we want to 
wait for the archiving to happen (keep in mind, the HistoryServer is no-op by 
default; i.e. the user has to explicitly configure Flink to use it)
   
   Thank you for the detailed explanation. It's my bad that I forget this 
conclusion. The completion of `shutdownFuture` is put into `whenComplete` of 
`jobReachedTerminalState` now.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+        finishJob(testingJobManagerRunner);
+
+        // Before the archiving is finished, the cleanup is not finished and 
the job is not
+        // terminated.
+        assertThatNoCleanupWasTriggered();
+        final CompletableFuture<Void> jobTerminationFuture =
+                dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L));
+        assertFalse(jobTerminationFuture.isDone());
+
+        archiveFuture.complete(null);
+
+        // Once the archive is finished, the cleanup is finished and the job 
is terminated.
+        assertGlobalCleanupTriggered(jobId);
+        jobTerminationFuture.join();
+
+        assertTrue(archivist.isArchived());
+    }
+
+    @Test
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+
+        final long archiveTimeout = 10L;

Review Comment:
   Thank you for pointing this out. This comment does make this test case 
easier to understand.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+        finishJob(testingJobManagerRunner);
+
+        // Before the archiving is finished, the cleanup is not finished and 
the job is not
+        // terminated.
+        assertThatNoCleanupWasTriggered();
+        final CompletableFuture<Void> jobTerminationFuture =
+                dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L));
+        assertFalse(jobTerminationFuture.isDone());
+
+        archiveFuture.complete(null);
+
+        // Once the archive is finished, the cleanup is finished and the job 
is terminated.
+        assertGlobalCleanupTriggered(jobId);
+        jobTerminationFuture.join();
+
+        assertTrue(archivist.isArchived());
+    }
+
+    @Test
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+
+        final long archiveTimeout = 10L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+
+        finishJob(testingJobManagerRunner);
+
+        // Once the archiving is timed out, the cleanup will be finished and 
the job will be
+        // terminated.
+        assertGlobalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+        assertFalse(archivist.isArchived());
+    }
+
+    @Test
+    public void testNotArchiveSuspendedJob() throws Exception {

Review Comment:
   Thank you for the thumb up :-)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,17 +1238,22 @@ private void jobMasterFailed(JobID jobId, Throwable 
cause) {
                 getMainThreadExecutor());
     }
 
-    CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+    private CompletableFuture<Void> getTerminatedJobTerminationFuture(JobID 
jobId) {

Review Comment:
   Yes. I think maybe `getJobTerminationFutureOrFailedFutureForRunningJob` is 
better. Modified here.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+        finishJob(testingJobManagerRunner);
+
+        // Before the archiving is finished, the cleanup is not finished and 
the job is not
+        // terminated.
+        assertThatNoCleanupWasTriggered();
+        final CompletableFuture<Void> jobTerminationFuture =
+                dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L));
+        assertFalse(jobTerminationFuture.isDone());
+
+        archiveFuture.complete(null);
+
+        // Once the archive is finished, the cleanup is finished and the job 
is terminated.
+        assertGlobalCleanupTriggered(jobId);
+        jobTerminationFuture.join();
+
+        assertTrue(archivist.isArchived());
+    }
+
+    @Test
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+
+        final long archiveTimeout = 10L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+
+        finishJob(testingJobManagerRunner);
+
+        // Once the archiving is timed out, the cleanup will be finished and 
the job will be
+        // terminated.
+        assertGlobalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();

Review Comment:
   Thank you for this detailed explanation. I should've corrected it here. I'm 
still unfamiliar with the idea that CI is in charge of the timeout. Corrected.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;

Review Comment:
   Yes, thank you for pointing this out. The timeout I set is redundant. 
Removed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+        finishJob(testingJobManagerRunner);
+
+        // Before the archiving is finished, the cleanup is not finished and 
the job is not
+        // terminated.
+        assertThatNoCleanupWasTriggered();
+        final CompletableFuture<Void> jobTerminationFuture =
+                dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L));
+        assertFalse(jobTerminationFuture.isDone());
+
+        archiveFuture.complete(null);
+
+        // Once the archive is finished, the cleanup is finished and the job 
is terminated.
+        assertGlobalCleanupTriggered(jobId);
+        jobTerminationFuture.join();
+
+        assertTrue(archivist.isArchived());
+    }
+
+    @Test
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+
+        final long archiveTimeout = 10L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+
+        finishJob(testingJobManagerRunner);
+
+        // Once the archiving is timed out, the cleanup will be finished and 
the job will be
+        // terminated.
+        assertGlobalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+        assertFalse(archivist.isArchived());
+    }
+
+    @Test
+    public void testNotArchiveSuspendedJob() throws Exception {
+
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(1000L, archivist);
+
+        suspendJob(testingJobManagerRunner);
+
+        assertLocalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+        assertFalse(archivist.isArchived());
+    }
+
+    private TestingJobManagerRunner startDispatcherAndSubmitJob(

Review Comment:
   Thank you for pointing this out. I should reuse existing implementation 
here. The redundant function I introduced is removed, now.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java:
##########
@@ -126,7 +126,8 @@ public ResourceCleaner createGlobalResourceCleaner(
      * @param localResource Local resource that we want to clean during a 
global cleanup.
      * @return Globally cleanable resource.
      */
-    private static GloballyCleanableResource ofLocalResource(
+    @VisibleForTesting
+    public static GloballyCleanableResource ofLocalResource(

Review Comment:
   Sorry that I forget it. Thank you for reminding me of this 👍 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -762,4 +850,29 @@ public JobManagerRunner createJobManagerRunner(
             throw testException;
         }
     }
+
+    private static class TestingHistoryServerArchivist implements 
HistoryServerArchivist {

Review Comment:
   Thank you for proposing a much better implementation which makes the code 
simple and clean. I do learn a lot from this. `TestingHistoryServerArchivist` 
is removed now.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+        finishJob(testingJobManagerRunner);
+
+        // Before the archiving is finished, the cleanup is not finished and 
the job is not
+        // terminated.
+        assertThatNoCleanupWasTriggered();
+        final CompletableFuture<Void> jobTerminationFuture =
+                dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L));
+        assertFalse(jobTerminationFuture.isDone());
+
+        archiveFuture.complete(null);
+
+        // Once the archive is finished, the cleanup is finished and the job 
is terminated.
+        assertGlobalCleanupTriggered(jobId);
+        jobTerminationFuture.join();
+
+        assertTrue(archivist.isArchived());
+    }
+
+    @Test
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+
+        final long archiveTimeout = 10L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+
+        finishJob(testingJobManagerRunner);
+
+        // Once the archiving is timed out, the cleanup will be finished and 
the job will be
+        // terminated.
+        assertGlobalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+        assertFalse(archivist.isArchived());
+    }
+
+    @Test
+    public void testNotArchiveSuspendedJob() throws Exception {
+
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(1000L, archivist);
+
+        suspendJob(testingJobManagerRunner);
+
+        assertLocalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+        assertFalse(archivist.isArchived());
+    }
+
+    private TestingJobManagerRunner startDispatcherAndSubmitJob(
+            long archiveTimeout, HistoryServerArchivist 
historyServerArchivist) throws Exception {
+        final Configuration configuration = new Configuration();
+        
configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 
archiveTimeout);
+
+        final TestingJobMasterServiceLeadershipRunnerFactory 
testingJobManagerRunnerFactory =
+                new TestingJobMasterServiceLeadershipRunnerFactory(0);

Review Comment:
   Sorry for being careless. This function is removed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to