[
https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897495#comment-15897495
]
ASF GitHub Bot commented on FLINK-1579:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/3460#discussion_r104431499
--- Diff:
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
---
@@ -205,215 +125,18 @@ public void channelRead0(ChannelHandlerContext ctx,
Routed routed) throws Except
}
}
- /**
- * Response when running with leading JobManager.
- */
- private void respondAsLeader(ChannelHandlerContext ctx, HttpRequest
request, String requestPath)
- throws IOException, ParseException, URISyntaxException {
-
- // convert to absolute path
- final File file = new File(rootPath, requestPath);
-
- if (!file.exists()) {
- // file does not exist. Try to load it with the
classloader
- ClassLoader cl =
StaticFileServerHandler.class.getClassLoader();
-
- try(InputStream resourceStream =
cl.getResourceAsStream("web" + requestPath)) {
- boolean success = false;
- try {
- if (resourceStream != null) {
- URL root =
cl.getResource("web");
- URL requested =
cl.getResource("web" + requestPath);
-
- if (root != null && requested
!= null) {
- URI rootURI = new
URI(root.getPath()).normalize();
- URI requestedURI = new
URI(requested.getPath()).normalize();
-
- // Check that we don't
load anything from outside of the
- // expected scope.
- if
(!rootURI.relativize(requestedURI).equals(requestedURI)) {
-
logger.debug("Loading missing file from classloader: {}", requestPath);
- // ensure that
directory to file exists.
-
file.getParentFile().mkdirs();
-
Files.copy(resourceStream, file.toPath());
-
- success = true;
- }
- }
- }
- } catch (Throwable t) {
- logger.error("error while responding",
t);
- } finally {
- if (!success) {
- logger.debug("Unable to load
requested file {} from classloader", requestPath);
- sendError(ctx, NOT_FOUND);
- return;
- }
- }
- }
- }
-
- if (!file.exists() || file.isHidden() || file.isDirectory() ||
!file.isFile()) {
- sendError(ctx, NOT_FOUND);
- return;
- }
-
- if
(!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
- sendError(ctx, NOT_FOUND);
- return;
- }
-
- // cache validation
- final String ifModifiedSince =
request.headers().get(IF_MODIFIED_SINCE);
- if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
- SimpleDateFormat dateFormatter = new
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
- Date ifModifiedSinceDate =
dateFormatter.parse(ifModifiedSince);
-
- // Only compare up to the second because the datetime
format we send to the client
- // does not have milliseconds
- long ifModifiedSinceDateSeconds =
ifModifiedSinceDate.getTime() / 1000;
- long fileLastModifiedSeconds = file.lastModified() /
1000;
- if (ifModifiedSinceDateSeconds ==
fileLastModifiedSeconds) {
- if (logger.isDebugEnabled()) {
- logger.debug("Responding 'NOT MODIFIED'
for file '" + file.getAbsolutePath() + '\'');
- }
-
- sendNotModified(ctx);
- return;
- }
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("Responding with file '" +
file.getAbsolutePath() + '\'');
- }
-
- // Don't need to close this manually. Netty's DefaultFileRegion
will take care of it.
- final RandomAccessFile raf;
- try {
- raf = new RandomAccessFile(file, "r");
- }
- catch (FileNotFoundException e) {
- sendError(ctx, NOT_FOUND);
- return;
- }
- long fileLength = raf.length();
-
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- setContentTypeHeader(response, file);
-
- // since the log and out files are rapidly changing, we don't
want to browser to cache them
- if (!(requestPath.contains("log") ||
requestPath.contains("out"))) {
- setDateAndCacheHeaders(response, file);
- }
- if (HttpHeaders.isKeepAlive(request)) {
- response.headers().set(CONNECTION,
HttpHeaders.Values.KEEP_ALIVE);
- }
- HttpHeaders.setContentLength(response, fileLength);
-
- // write the initial line and the header.
- ctx.write(response);
-
- // write the content.
- ChannelFuture lastContentFuture;
- if (ctx.pipeline().get(SslHandler.class) == null) {
- ctx.write(new DefaultFileRegion(raf.getChannel(), 0,
fileLength), ctx.newProgressivePromise());
- lastContentFuture =
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ @Override
+ public String preProcessRequestPath(String requestPath) {
+ // in case the files being accessed are logs or stdout files,
find appropriate paths.
+ if (requestPath.equals("/jobmanager/log") ||
requestPath.equals("/jobmanager/stdout")) {
+ return "";
} else {
- lastContentFuture = ctx.writeAndFlush(new
HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
- ctx.newProgressivePromise());
- // HttpChunkedInput will write the end marker
(LastHttpContent) for us.
- }
-
- // close the connection, if no keep-alive is needed
- if (!HttpHeaders.isKeepAlive(request)) {
-
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+ return requestPath;
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
{
- if (ctx.channel().isActive()) {
- logger.error("Caught exception", cause);
- sendError(ctx, INTERNAL_SERVER_ERROR);
- }
- }
-
- //
------------------------------------------------------------------------
- // Utilities to encode headers and responses
- //
------------------------------------------------------------------------
-
- /**
- * Writes a simple error response message.
- *
- * @param ctx The channel context to write the response to.
- * @param status The response status.
- */
- private static void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
- FullHttpResponse response = new DefaultFullHttpResponse(
- HTTP_1_1, status,
Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
- response.headers().set(CONTENT_TYPE, "text/plain;
charset=UTF-8");
-
- // close the connection as soon as the error message is sent.
-
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
- }
-
- /**
- * Send the "304 Not Modified" response. This response can be used when
the
- * file timestamp is the same as what the browser is sending up.
- *
- * @param ctx The channel context to write the response to.
- */
- private static void sendNotModified(ChannelHandlerContext ctx) {
- FullHttpResponse response = new
DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);
- setDateHeader(response);
-
- // close the connection as soon as the error message is sent.
-
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
- }
-
- /**
- * Sets the "date" header for the HTTP response.
- *
- * @param response HTTP response
- */
- private static void setDateHeader(FullHttpResponse response) {
- SimpleDateFormat dateFormatter = new
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
- dateFormatter.setTimeZone(GMT_TIMEZONE);
-
- Calendar time = new GregorianCalendar();
- response.headers().set(DATE,
dateFormatter.format(time.getTime()));
- }
-
- /**
- * Sets the "date" and "cache" headers for the HTTP Response.
- *
- * @param response The HTTP response object.
- * @param fileToCache File to extract the modification timestamp from.
- */
- private static void setDateAndCacheHeaders(HttpResponse response, File
fileToCache) {
- SimpleDateFormat dateFormatter = new
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
- dateFormatter.setTimeZone(GMT_TIMEZONE);
-
- // date header
- Calendar time = new GregorianCalendar();
- response.headers().set(DATE,
dateFormatter.format(time.getTime()));
-
- // cache headers
- time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
- response.headers().set(EXPIRES,
dateFormatter.format(time.getTime()));
- response.headers().set(CACHE_CONTROL, "private, max-age=" +
HTTP_CACHE_SECONDS);
- response.headers().set(LAST_MODIFIED, dateFormatter.format(new
Date(fileToCache.lastModified())));
- }
-
- /**
- * Sets the content type header for the HTTP Response.
- *
- * @param response HTTP response
- * @param file file to extract content type
- */
- private static void setContentTypeHeader(HttpResponse response, File
file) {
- String mimeType =
MimeTypes.getMimeTypeForFileName(file.getName());
- String mimeFinal = mimeType != null ? mimeType :
MimeTypes.getDefaultMimeType();
- response.headers().set(CONTENT_TYPE, mimeFinal);
+ protected boolean shouldCache(String requestPath) {
+ return !(requestPath.contains("log") ||
requestPath.contains("out"));
--- End diff --
Should we make this more explicit in order to prevent accidental
non-caching of requests that contain out or log for another reason?
> Create a Flink History Server
> -----------------------------
>
> Key: FLINK-1579
> URL: https://issues.apache.org/jira/browse/FLINK-1579
> Project: Flink
> Issue Type: New Feature
> Components: Distributed Coordination
> Affects Versions: 0.9
> Reporter: Robert Metzger
> Assignee: Chesnay Schepler
>
> Right now its not possible to analyze the job results for jobs that ran on
> YARN, because we'll loose the information once the JobManager has stopped.
> Therefore, I propose to implement a "Flink History Server" which serves the
> results from these jobs.
> I haven't started thinking about the implementation, but I suspect it
> involves some JSON files stored in HDFS :)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)