XComp commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r845007972


##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;

Review Comment:
   We can use the default timeout of 30000ms in that case, right? We don't have 
to set a different one, if I'm not mistaken.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+        finishJob(testingJobManagerRunner);
+
+        // Before the archiving is finished, the cleanup is not finished and 
the job is not
+        // terminated.
+        assertThatNoCleanupWasTriggered();
+        final CompletableFuture<Void> jobTerminationFuture =
+                dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L));
+        assertFalse(jobTerminationFuture.isDone());
+
+        archiveFuture.complete(null);
+
+        // Once the archive is finished, the cleanup is finished and the job 
is terminated.
+        assertGlobalCleanupTriggered(jobId);
+        jobTerminationFuture.join();
+
+        assertTrue(archivist.isArchived());
+    }
+
+    @Test
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+
+        final long archiveTimeout = 10L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+
+        finishJob(testingJobManagerRunner);
+
+        // Once the archiving is timed out, the cleanup will be finished and 
the job will be
+        // terminated.
+        assertGlobalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+        assertFalse(archivist.isArchived());
+    }
+
+    @Test
+    public void testNotArchiveSuspendedJob() throws Exception {
+
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(1000L, archivist);
+
+        suspendJob(testingJobManagerRunner);
+
+        assertLocalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+        assertFalse(archivist.isArchived());
+    }
+
+    private TestingJobManagerRunner startDispatcherAndSubmitJob(

Review Comment:
   I'm not sure whether we actually need that method. It does everything 
`startDispatcherAndSubmitJob` already does (i.e. instantiating a 
`TestingJobMasterServiceLeadershipRunnerFactory`, creating the `Dispatcher` and 
starting it. The `CLUSTER_SERVICES_SHUTDOWN_TIMEOUT` only has to be set once, 
i.e. we can pass in the `TestingDispatcher.Builder` instance and set the 
`HistoryServerArchivist` in the corresponding test (which is essentially only 
the callback). WDYT?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -762,4 +850,29 @@ public JobManagerRunner createJobManagerRunner(
             throw testException;
         }
     }
+
+    private static class TestingHistoryServerArchivist implements 
HistoryServerArchivist {

Review Comment:
   This class is kind of obsolete. We can replace any of its occurrences with a 
simple callback `executionGraphInfo -> new CompletedFuture()` (for never 
completing archiving) or `executionGraphInfo -> future` if you want to provide 
the future in the test method itself to simulate the completion of the 
archiving. WDYT?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+        finishJob(testingJobManagerRunner);
+
+        // Before the archiving is finished, the cleanup is not finished and 
the job is not
+        // terminated.
+        assertThatNoCleanupWasTriggered();
+        final CompletableFuture<Void> jobTerminationFuture =
+                dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L));
+        assertFalse(jobTerminationFuture.isDone());
+
+        archiveFuture.complete(null);
+
+        // Once the archive is finished, the cleanup is finished and the job 
is terminated.
+        assertGlobalCleanupTriggered(jobId);
+        jobTerminationFuture.join();
+
+        assertTrue(archivist.isArchived());
+    }
+
+    @Test
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+
+        final long archiveTimeout = 10L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+
+        finishJob(testingJobManagerRunner);
+
+        // Once the archiving is timed out, the cleanup will be finished and 
the job will be
+        // terminated.
+        assertGlobalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();

Review Comment:
   I know this issue existed before, but could we also add `@VisibleForTesting` 
to `Dispatcher.getJobTerminationFuture` because I realized that it's package 
private but only used in tests (fyi: this would be a separate hotfix commit to 
separate this general fix from the actual PR's change).



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+        finishJob(testingJobManagerRunner);
+
+        // Before the archiving is finished, the cleanup is not finished and 
the job is not
+        // terminated.
+        assertThatNoCleanupWasTriggered();
+        final CompletableFuture<Void> jobTerminationFuture =
+                dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L));
+        assertFalse(jobTerminationFuture.isDone());
+
+        archiveFuture.complete(null);
+
+        // Once the archive is finished, the cleanup is finished and the job 
is terminated.
+        assertGlobalCleanupTriggered(jobId);
+        jobTerminationFuture.join();
+
+        assertTrue(archivist.isArchived());
+    }
+
+    @Test
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+
+        final long archiveTimeout = 10L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+
+        finishJob(testingJobManagerRunner);
+
+        // Once the archiving is timed out, the cleanup will be finished and 
the job will be
+        // terminated.
+        assertGlobalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+        assertFalse(archivist.isArchived());
+    }
+
+    @Test
+    public void testNotArchiveSuspendedJob() throws Exception {

Review Comment:
   I really like that you also considered that case. 👍 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java:
##########
@@ -129,10 +129,12 @@ public MiniDispatcher(
     }
 
     @Override
-    protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo 
executionGraphInfo) {
+    protected CompletableFuture<CleanupJobState> jobReachedTerminalState(
+            ExecutionGraphInfo executionGraphInfo) {
         final ArchivedExecutionGraph archivedExecutionGraph =
                 executionGraphInfo.getArchivedExecutionGraph();
-        final CleanupJobState cleanupHAState = 
super.jobReachedTerminalState(executionGraphInfo);
+        final CompletableFuture<CleanupJobState> cleanupHAState =

Review Comment:
   It's more a question about whether we want the HistoryServer archiving to 
have happened before shutting down the cluster. We came up with the conclusion 
that, if a user configures the HistoryServer, he/she might have the desire to 
have the result archived. In that case, we want to wait for the archiving to 
happen (keep in mind, the HistoryServer is no-op by default; i.e. the user has 
to explicitly configure Flink to use it).
   
   I don't understand your comment about `Originally, the shutdownFuture is 
completed without waiting for completion of jobReachedTerminalState`. May you 
elaborate a bit more on that. As far as I can see, it did wait for the 
`jobReachedTerminalState` to complete (because it was synchronously called). It 
didn't wait for the archiving to finish, though, which is what we want to 
change as far as I understand. Or am I missing something here? 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+        finishJob(testingJobManagerRunner);
+
+        // Before the archiving is finished, the cleanup is not finished and 
the job is not
+        // terminated.
+        assertThatNoCleanupWasTriggered();
+        final CompletableFuture<Void> jobTerminationFuture =
+                dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L));
+        assertFalse(jobTerminationFuture.isDone());
+
+        archiveFuture.complete(null);
+
+        // Once the archive is finished, the cleanup is finished and the job 
is terminated.
+        assertGlobalCleanupTriggered(jobId);
+        jobTerminationFuture.join();
+
+        assertTrue(archivist.isArchived());
+    }
+
+    @Test
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+
+        final long archiveTimeout = 10L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+
+        finishJob(testingJobManagerRunner);
+
+        // Once the archiving is timed out, the cleanup will be finished and 
the job will be
+        // terminated.
+        assertGlobalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();

Review Comment:
   ```suggestion
           dispatcher.getJobTerminationFuture(jobId, Time.hours(1)).join();
   ```
   We're trying to avoid short timeouts (the same reasoning I already brought 
up for the `@Test(timeout = 5000L)` annotation: In most of the cases, we're 
expecting the result to complete soonish. In that case, we don't need a timeout 
and, therefore, rather go for a quite high one. This has the benefit that the 
CI timeout would kick in which would print the stacktraces of all threads. That 
would help investigating any issues that caused the timeout.
   This shouldn't be a problem in the happy case because we're still expecting 
the future to return quite quickly.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1066,24 @@ protected CleanupJobState 
jobReachedTerminalState(ExecutionGraphInfo executionGr
                     terminalJobStatus);
         }
 
-        archiveExecutionGraph(executionGraphInfo);
+        storeExecutionGraphInfo(executionGraphInfo);
 
         if (terminalJobStatus.isGloballyTerminalState()) {
-            final JobID jobId = executionGraphInfo.getJobId();
-            try {
-                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                    log.warn(
-                            "Job {} is already marked as clean but clean up 
was triggered again.",
-                            jobId);
-                } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                    jobResultStore.createDirtyResult(
-                            new JobResultEntry(
-                                    JobResult.createFrom(
-                                            
executionGraphInfo.getArchivedExecutionGraph())));
-                    log.info(
-                            "Job {} has been registered for cleanup in the 
JobResultStore after reaching a terminal state.",
-                            jobId);
-                }
-            } catch (IOException e) {
-                fatalErrorHandler.onFatalError(
-                        new FlinkException(
-                                String.format(
-                                        "The job %s couldn't be marked as 
pre-cleanup finished in JobResultStore.",
-                                        jobId),
-                                e));
-            }
-        }
 
-        return terminalJobStatus.isGloballyTerminalState()
-                ? CleanupJobState.GLOBAL
-                : CleanupJobState.LOCAL;
+            // do not create an archive for suspended jobs, as this would 
eventually lead to
+            // multiple archive attempts which we currently do not support
+            CompletableFuture<Acknowledge> archiveFuture =
+                    archiveExecutionGraph(executionGraphInfo);
+
+            registerCleanupInJobResultStore(executionGraphInfo);

Review Comment:
   No, `thenApply` only works in the successful case. We want to register the 
job as completed even if the archiving failed. Therefore, using `whenComplete` 
is the better option.
   
   > However, I put it here because I think if the archiving takes a long time 
and the cluster is killed by the user or the external resource providers, the 
registration is done already and the cleanup will be executed once the job is 
resumed.
   
   The reasoning behind chaining them together was, that the HistoryServer is 
opt-in functionality, i.e. if the user configured it, he/she would expect it to 
work even if it takes time. The default works with a no-op history server. WDYT?
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java:
##########
@@ -126,7 +126,8 @@ public ResourceCleaner createGlobalResourceCleaner(
      * @param localResource Local resource that we want to clean during a 
global cleanup.
      * @return Globally cleanable resource.
      */
-    private static GloballyCleanableResource ofLocalResource(
+    @VisibleForTesting
+    public static GloballyCleanableResource ofLocalResource(

Review Comment:
   This needs to be reverted as well...



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+        finishJob(testingJobManagerRunner);
+
+        // Before the archiving is finished, the cleanup is not finished and 
the job is not
+        // terminated.
+        assertThatNoCleanupWasTriggered();
+        final CompletableFuture<Void> jobTerminationFuture =
+                dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L));
+        assertFalse(jobTerminationFuture.isDone());
+
+        archiveFuture.complete(null);
+
+        // Once the archive is finished, the cleanup is finished and the job 
is terminated.
+        assertGlobalCleanupTriggered(jobId);
+        jobTerminationFuture.join();
+
+        assertTrue(archivist.isArchived());
+    }
+
+    @Test
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+
+        final long archiveTimeout = 10L;

Review Comment:
   ```suggestion
           // setting a low timeout to simulate a timeout of the archiving step
           final long archiveTimeout = 10L;
   ```
   nit: Adding a comment here might help understanding the test a bit better



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +654,91 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final long archiveTimeout = 1000L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+        finishJob(testingJobManagerRunner);
+
+        // Before the archiving is finished, the cleanup is not finished and 
the job is not
+        // terminated.
+        assertThatNoCleanupWasTriggered();
+        final CompletableFuture<Void> jobTerminationFuture =
+                dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L));
+        assertFalse(jobTerminationFuture.isDone());
+
+        archiveFuture.complete(null);
+
+        // Once the archive is finished, the cleanup is finished and the job 
is terminated.
+        assertGlobalCleanupTriggered(jobId);
+        jobTerminationFuture.join();
+
+        assertTrue(archivist.isArchived());
+    }
+
+    @Test
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+
+        final long archiveTimeout = 10L;
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(archiveTimeout, archivist);
+
+        finishJob(testingJobManagerRunner);
+
+        // Once the archiving is timed out, the cleanup will be finished and 
the job will be
+        // terminated.
+        assertGlobalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+        assertFalse(archivist.isArchived());
+    }
+
+    @Test
+    public void testNotArchiveSuspendedJob() throws Exception {
+
+        final CompletableFuture<Void> archiveFuture = new 
CompletableFuture<>();
+        final TestingHistoryServerArchivist archivist =
+                new TestingHistoryServerArchivist(archiveFuture);
+
+        final TestingJobManagerRunner testingJobManagerRunner =
+                startDispatcherAndSubmitJob(1000L, archivist);
+
+        suspendJob(testingJobManagerRunner);
+
+        assertLocalCleanupTriggered(jobId);
+        dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+        assertFalse(archivist.isArchived());
+    }
+
+    private TestingJobManagerRunner startDispatcherAndSubmitJob(
+            long archiveTimeout, HistoryServerArchivist 
historyServerArchivist) throws Exception {
+        final Configuration configuration = new Configuration();
+        
configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 
archiveTimeout);
+
+        final TestingJobMasterServiceLeadershipRunnerFactory 
testingJobManagerRunnerFactory =
+                new TestingJobMasterServiceLeadershipRunnerFactory(0);

Review Comment:
   ```suggestion
                   new TestingJobMasterServiceLeadershipRunnerFactory();
   ```
   There's a default constructor for that setting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to