GJL commented on a change in pull request #11571: [FLINK-16710][runtime] Log 
Upload blocks Main Thread in TaskExecutor
URL: https://github.com/apache/flink/pull/11571#discussion_r402362304
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##########
 @@ -315,21 +315,22 @@ public TaskExecutor(
 
        @Override
        public CompletableFuture<Collection<LogInfo>> requestLogList(Time 
timeout) {
-               final String logDir = 
taskManagerConfiguration.getTaskManagerLogDir();
-               if (logDir != null) {
-                       final File[] logFiles = new File(logDir).listFiles();
+               return CompletableFuture.supplyAsync(() -> {
+                       final String logDir = 
taskManagerConfiguration.getTaskManagerLogDir();
+                       if (logDir != null) {
+                               final File[] logFiles = new 
File(logDir).listFiles();
 
-                       if (logFiles == null) {
-                               return FutureUtils.completedExceptionally(new 
FlinkException(String.format("There isn't a log file in TaskExecutor’s log dir 
%s.", logDir)));
-                       }
+                               if (logFiles == null) {
+                                       throw new CompletionException(new 
FlinkException(String.format("There isn't a log file in TaskExecutor’s log dir 
%s.", logDir)));
+                               }
 
-                       final List<LogInfo> logsWithLength = 
Arrays.stream(logFiles)
-                               .filter(File::isFile)
-                               .map(logFile -> new LogInfo(logFile.getName(), 
logFile.length()))
-                               .collect(Collectors.toList());
-                       return 
CompletableFuture.completedFuture(logsWithLength);
-               }
-               return 
CompletableFuture.completedFuture(Collections.emptyList());
+                               return Arrays.stream(logFiles)
+                                               .filter(File::isFile)
+                                               .map(logFile -> new 
LogInfo(logFile.getName(), logFile.length()))
+                                               .collect(Collectors.toList());
+                       }
+                       return Collections.emptyList();
+               }, taskExecutorServices.getIOExecutor());
 
 Review comment:
   I would declare a field and initialize it in the constructor:
   
   ```
        /** The kvState registration service in the task manager. */
        private final KvStateService kvStateService;
   
        private final Executor ioExecutor;
   
        // --------- job manager connections -----------
   ```
   
   ```
                this.kvStateService = taskExecutorServices.getKvStateService();
                this.ioExecutor = taskExecutorServices.getIOExecutor();
   
                this.resourceManagerLeaderRetriever = 
haServices.getResourceManagerLeaderRetriever();
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to