Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5341#discussion_r165941154
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
---
@@ -473,10 +476,55 @@ public WebMonitorEndpoint(
handlers.add(Tuple2.of(SubtaskCurrentAttemptDetailsHeaders.getInstance(),
subtaskCurrentAttemptDetailsHandler));
handlers.add(Tuple2.of(JobVertexTaskManagersHeaders.getInstance(),
jobVertexTaskManagersHandler));
- // This handler MUST be added last, as it otherwise masks all
subsequent GET handlers
optWebContent.ifPresent(
webContent ->
handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(),
webContent)));
+ // load the log and stdout file handler for the main cluster
component
+ final WebMonitorUtils.LogFileLocation logFileLocation =
WebMonitorUtils.LogFileLocation.find(clusterConfiguration);
+
+ final ChannelInboundHandler logFileHandler;
+
+ if (logFileLocation.logFile == null) {
+ logFileHandler = new ConstantTextHandler("(log file
unavailable)");
+ } else {
+ ChannelInboundHandler staticFileServerHandler;
+ try {
+ staticFileServerHandler = new
StaticFileServerHandler<>(
+ leaderRetriever,
+ restAddressFuture,
+ timeout,
+ logFileLocation.logFile);
+ } catch (IOException e) {
+ log.info("Cannot load log file handler.", e);
+ staticFileServerHandler = new
ConstantTextHandler("(log file unavailable)");
+ }
+
+ logFileHandler = staticFileServerHandler;
+ }
+
+ final ChannelInboundHandler stdoutFileHandler;
+
+ if (logFileLocation.stdOutFile == null) {
--- End diff --
I would try to avoid code duplication, e.g.,
```
final ChannelInboundHandler logFileHandler =
createLogFileHandler(logFileLocation.logFile, restAddressFuture);
final ChannelInboundHandler stdoutFileHandler =
createLogFileHandler(logFileLocation.stdOutFile, restAddressFuture);
```
```
private ChannelInboundHandler createLogFileHandler(
final File logFile,
final CompletableFuture<String> restAddressFuture) {
if (logFile == null) {
return new ConstantTextHandler("(log file
unavailable)");
} else {
ChannelInboundHandler staticFileServerHandler;
try {
staticFileServerHandler = new
StaticFileServerHandler<>(
leaderRetriever,
restAddressFuture,
restConfiguration.getTimeout(),
logFile);
} catch (IOException e) {
log.info("Cannot load log file handler ().",
logFile, e);
staticFileServerHandler = new
ConstantTextHandler("(log file unavailable)");
}
return staticFileServerHandler;
}
}
```
(untested)
---