Thesharing commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r845254295


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1096,64 @@ private void archiveExecutionGraph(ExecutionGraphInfo 
executionGraphInfo) {
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
 
-        // do not create an archive for suspended jobs, as this would 
eventually lead to multiple
-        // archive attempts which we currently do not support
-        if 
(executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState())
 {
-            final CompletableFuture<Acknowledge> executionGraphFuture =
-                    
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
-            executionGraphFuture.whenComplete(
-                    (Acknowledge ignored, Throwable throwable) -> {
-                        if (throwable != null) {
-                            log.info(
-                                    "Could not archive completed job {}({}) to 
the history server.",
-                                    
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
-                                    
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
-                                    throwable);
-                        }
-                    });
+    private CompletableFuture<Acknowledge> 
archiveExecutionGraphToHistoryServer(
+            ExecutionGraphInfo executionGraphInfo) {
+
+        final CompletableFuture<Acknowledge> executionGraphFuture =
+                FutureUtils.orTimeout(
+                        
historyServerArchivist.archiveExecutionGraph(executionGraphInfo),
+                        
configuration.get(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT),
+                        TimeUnit.MILLISECONDS,
+                        getMainThreadExecutor());

Review Comment:
   In our production, we met a scenario that the user specifies 
`historyserver.archive.fs.dir` to a remote OSS path. It takes a pretty long 
time to archive the files to OSS because the destination bucket has billions of 
files and OSS is busy listing all the files in that bucket. I suppose we 
couldn't guarantee that all the implementations of the `FileSystem` has a 
timeout mechanism itself.
   
   I'm worried that if no timeout is added, the job will never end in the worst 
scenario. I fully understand that if a user enables the configs related to the 
history server, he/she wants to have the archiving finished. However, if the 
worst case happened, the jobs never ends, and the user has to shutdown the job 
manually, with no resource clean up. In my opinion, resource clean up is more 
important than archiving, even if the user enables the history server. 
Therefore, I think that a timeout is necessary for the archiving. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to