Thesharing commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r845871430


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1096,64 @@ private void archiveExecutionGraph(ExecutionGraphInfo 
executionGraphInfo) {
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
 
-        // do not create an archive for suspended jobs, as this would 
eventually lead to multiple
-        // archive attempts which we currently do not support
-        if 
(executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState())
 {
-            final CompletableFuture<Acknowledge> executionGraphFuture =
-                    
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
-            executionGraphFuture.whenComplete(
-                    (Acknowledge ignored, Throwable throwable) -> {
-                        if (throwable != null) {
-                            log.info(
-                                    "Could not archive completed job {}({}) to 
the history server.",
-                                    
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
-                                    
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
-                                    throwable);
-                        }
-                    });
+    private CompletableFuture<Acknowledge> 
archiveExecutionGraphToHistoryServer(
+            ExecutionGraphInfo executionGraphInfo) {
+
+        final CompletableFuture<Acknowledge> executionGraphFuture =
+                FutureUtils.orTimeout(
+                        
historyServerArchivist.archiveExecutionGraph(executionGraphInfo),
+                        
configuration.get(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT),
+                        TimeUnit.MILLISECONDS,
+                        getMainThreadExecutor());

Review Comment:
   > To me, it sounds like an issue of the FileSystem implementation. Or am I 
missing something here?
   
   Yes, you are right. It's an issue of the `FileSystem` implementation. The 
operation of listing the files in the bucket is the detail of the OSS 
`FileSystem` implementation. However, we could not make sure all the 
implementations of the `FileSystem` has the timeout mechanism. If not, I think 
maybe it could be safer if we add a timeout here. We try best to archive the 
ExecutionGraph, but once it takes a long time, we could output an error to tell 
the users the destination they specifies is unreachable within limited time.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to