kaibozhou commented on a change in pull request #11250: [FLINK-16302][rest]add
log list and read log by name for taskmanager
URL: https://github.com/apache/flink/pull/11250#discussion_r393695494
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -582,16 +582,41 @@ public void notifySlotAvailable(
}
@Override
- public CompletableFuture<TransientBlobKey>
requestTaskManagerFileUpload(ResourceID taskManagerId, FileType fileType, Time
timeout) {
- log.debug("Request file {} upload from TaskExecutor {}.",
fileType, taskManagerId);
+ public CompletableFuture<TransientBlobKey>
requestTaskManagerFileUploadByType(ResourceID taskManagerId, FileType fileType,
Time timeout) {
+ log.debug("Request file which type is {} upload from
TaskExecutor {}.", fileType, taskManagerId);
final WorkerRegistration<WorkerType> taskExecutor =
taskExecutors.get(taskManagerId);
if (taskExecutor == null) {
- log.debug("Requested file {} upload from unregistered
TaskExecutor {}.", fileType, taskManagerId);
+ log.debug("Requested which type is {} upload from
unregistered TaskExecutor {}.", fileType, taskManagerId);
return FutureUtils.completedExceptionally(new
UnknownTaskExecutorException(taskManagerId));
} else {
- return
taskExecutor.getTaskExecutorGateway().requestFileUpload(fileType, timeout);
+ return
taskExecutor.getTaskExecutorGateway().requestFileUploadByType(fileType,
timeout);
+ }
+ }
+
+ @Override
+ public CompletableFuture<TransientBlobKey>
requestTaskManagerFileUploadByName(ResourceID taskManagerId, String fileName,
Time timeout) {
+ log.debug("Request file which name is {} upload from
TaskExecutor {}.", fileName, taskManagerId);
+
+ final WorkerRegistration<WorkerType> taskExecutor =
taskExecutors.get(taskManagerId);
+
+ if (taskExecutor == null) {
+ log.debug("Requested file which name is {} upload from
unregistered TaskExecutor {}.", fileName, taskManagerId);
+ return FutureUtils.completedExceptionally(new
UnknownTaskExecutorException(taskManagerId));
+ } else {
+ return
taskExecutor.getTaskExecutorGateway().requestFileUploadByName(fileName,
timeout);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Collection<Tuple2<String, Long>>>
requestTaskManagerLogList(ResourceID taskManagerId, Time timeout) {
+ final WorkerRegistration<WorkerType> taskExecutor =
taskExecutors.get(taskManagerId);
+ if (taskExecutor == null) {
+ log.debug("Requested historical loglist from
unregistered TaskExecutor {}.", taskManagerId);
Review comment:
```suggestion
log.debug("Requested historical log list from
unregistered TaskExecutor {}.", taskManagerId);
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services