GJL commented on a change in pull request #11571: [FLINK-16710][runtime] Log
Upload blocks Main Thread in TaskExecutor
URL: https://github.com/apache/flink/pull/11571#discussion_r402362304
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -315,21 +315,22 @@ public TaskExecutor(
@Override
public CompletableFuture<Collection<LogInfo>> requestLogList(Time
timeout) {
- final String logDir =
taskManagerConfiguration.getTaskManagerLogDir();
- if (logDir != null) {
- final File[] logFiles = new File(logDir).listFiles();
+ return CompletableFuture.supplyAsync(() -> {
+ final String logDir =
taskManagerConfiguration.getTaskManagerLogDir();
+ if (logDir != null) {
+ final File[] logFiles = new
File(logDir).listFiles();
- if (logFiles == null) {
- return FutureUtils.completedExceptionally(new
FlinkException(String.format("There isn't a log file in TaskExecutor’s log dir
%s.", logDir)));
- }
+ if (logFiles == null) {
+ throw new CompletionException(new
FlinkException(String.format("There isn't a log file in TaskExecutor’s log dir
%s.", logDir)));
+ }
- final List<LogInfo> logsWithLength =
Arrays.stream(logFiles)
- .filter(File::isFile)
- .map(logFile -> new LogInfo(logFile.getName(),
logFile.length()))
- .collect(Collectors.toList());
- return
CompletableFuture.completedFuture(logsWithLength);
- }
- return
CompletableFuture.completedFuture(Collections.emptyList());
+ return Arrays.stream(logFiles)
+ .filter(File::isFile)
+ .map(logFile -> new
LogInfo(logFile.getName(), logFile.length()))
+ .collect(Collectors.toList());
+ }
+ return Collections.emptyList();
+ }, taskExecutorServices.getIOExecutor());
Review comment:
I would declare a field and initialize it in the constructor:
```
/** The kvState registration service in the task manager. */
private final KvStateService kvStateService;
private final Executor ioExecutor;
// --------- job manager connections -----------
```
```
this.kvStateService = taskExecutorServices.getKvStateService();
this.ioExecutor = taskExecutorServices.getIOExecutor();
this.resourceManagerLeaderRetriever =
haServices.getResourceManagerLeaderRetriever();
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services