[ 
https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897495#comment-15897495
 ] 

ASF GitHub Bot commented on FLINK-1579:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3460#discussion_r104431499
  
    --- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
 ---
    @@ -205,215 +125,18 @@ public void channelRead0(ChannelHandlerContext ctx, 
Routed routed) throws Except
                }
        }
     
    -   /**
    -    * Response when running with leading JobManager.
    -    */
    -   private void respondAsLeader(ChannelHandlerContext ctx, HttpRequest 
request, String requestPath)
    -                   throws IOException, ParseException, URISyntaxException {
    -
    -           // convert to absolute path
    -           final File file = new File(rootPath, requestPath);
    -
    -           if (!file.exists()) {
    -                   // file does not exist. Try to load it with the 
classloader
    -                   ClassLoader cl = 
StaticFileServerHandler.class.getClassLoader();
    -
    -                   try(InputStream resourceStream = 
cl.getResourceAsStream("web" + requestPath)) {
    -                           boolean success = false;
    -                           try {
    -                                   if (resourceStream != null) {
    -                                           URL root = 
cl.getResource("web");
    -                                           URL requested = 
cl.getResource("web" + requestPath);
    -
    -                                           if (root != null && requested 
!= null) {
    -                                                   URI rootURI = new 
URI(root.getPath()).normalize();
    -                                                   URI requestedURI = new 
URI(requested.getPath()).normalize();
    -
    -                                                   // Check that we don't 
load anything from outside of the
    -                                                   // expected scope.
    -                                                   if 
(!rootURI.relativize(requestedURI).equals(requestedURI)) {
    -                                                           
logger.debug("Loading missing file from classloader: {}", requestPath);
    -                                                           // ensure that 
directory to file exists.
    -                                                           
file.getParentFile().mkdirs();
    -                                                           
Files.copy(resourceStream, file.toPath());
    -
    -                                                           success = true;
    -                                                   }
    -                                           }
    -                                   }
    -                           } catch (Throwable t) {
    -                                   logger.error("error while responding", 
t);
    -                           } finally {
    -                                   if (!success) {
    -                                           logger.debug("Unable to load 
requested file {} from classloader", requestPath);
    -                                           sendError(ctx, NOT_FOUND);
    -                                           return;
    -                                   }
    -                           }
    -                   }
    -           }
    -
    -           if (!file.exists() || file.isHidden() || file.isDirectory() || 
!file.isFile()) {
    -                   sendError(ctx, NOT_FOUND);
    -                   return;
    -           }
    -
    -           if 
(!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
    -                   sendError(ctx, NOT_FOUND);
    -                   return;
    -           }
    -
    -           // cache validation
    -           final String ifModifiedSince = 
request.headers().get(IF_MODIFIED_SINCE);
    -           if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
    -                   SimpleDateFormat dateFormatter = new 
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
    -                   Date ifModifiedSinceDate = 
dateFormatter.parse(ifModifiedSince);
    -
    -                   // Only compare up to the second because the datetime 
format we send to the client
    -                   // does not have milliseconds
    -                   long ifModifiedSinceDateSeconds = 
ifModifiedSinceDate.getTime() / 1000;
    -                   long fileLastModifiedSeconds = file.lastModified() / 
1000;
    -                   if (ifModifiedSinceDateSeconds == 
fileLastModifiedSeconds) {
    -                           if (logger.isDebugEnabled()) {
    -                                   logger.debug("Responding 'NOT MODIFIED' 
for file '" + file.getAbsolutePath() + '\'');
    -                           }
    -
    -                           sendNotModified(ctx);
    -                           return;
    -                   }
    -           }
    -           
    -           if (logger.isDebugEnabled()) {
    -                   logger.debug("Responding with file '" + 
file.getAbsolutePath() + '\'');
    -           }
    -
    -           // Don't need to close this manually. Netty's DefaultFileRegion 
will take care of it.
    -           final RandomAccessFile raf;
    -           try {
    -                   raf = new RandomAccessFile(file, "r");
    -           }
    -           catch (FileNotFoundException e) {
    -                   sendError(ctx, NOT_FOUND);
    -                   return;
    -           }
    -           long fileLength = raf.length();
    -
    -           HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
    -           setContentTypeHeader(response, file);
    -
    -           // since the log and out files are rapidly changing, we don't 
want to browser to cache them
    -           if (!(requestPath.contains("log") || 
requestPath.contains("out"))) {
    -                   setDateAndCacheHeaders(response, file);
    -           }
    -           if (HttpHeaders.isKeepAlive(request)) {
    -                   response.headers().set(CONNECTION, 
HttpHeaders.Values.KEEP_ALIVE);
    -           }
    -           HttpHeaders.setContentLength(response, fileLength);
    -
    -           // write the initial line and the header.
    -           ctx.write(response);
    -
    -           // write the content.
    -           ChannelFuture lastContentFuture;
    -           if (ctx.pipeline().get(SslHandler.class) == null) {
    -                   ctx.write(new DefaultFileRegion(raf.getChannel(), 0, 
fileLength), ctx.newProgressivePromise());
    -                   lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
    +   @Override
    +   public String preProcessRequestPath(String requestPath) {
    +           // in case the files being accessed are logs or stdout files, 
find appropriate paths.
    +           if (requestPath.equals("/jobmanager/log") || 
requestPath.equals("/jobmanager/stdout")) {
    +                   return "";
                } else {
    -                   lastContentFuture = ctx.writeAndFlush(new 
HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
    -                           ctx.newProgressivePromise());
    -                   // HttpChunkedInput will write the end marker 
(LastHttpContent) for us.
    -           }
    -
    -           // close the connection, if no keep-alive is needed
    -           if (!HttpHeaders.isKeepAlive(request)) {
    -                   
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
    +                   return requestPath;
                }
        }
     
        @Override
    -   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
{
    -           if (ctx.channel().isActive()) {
    -                   logger.error("Caught exception", cause);
    -                   sendError(ctx, INTERNAL_SERVER_ERROR);
    -           }
    -   }
    -
    -   // 
------------------------------------------------------------------------
    -   //  Utilities to encode headers and responses
    -   // 
------------------------------------------------------------------------
    -
    -   /**
    -    * Writes a simple  error response message.
    -    *
    -    * @param ctx    The channel context to write the response to.
    -    * @param status The response status.
    -    */
    -   private static void sendError(ChannelHandlerContext ctx, 
HttpResponseStatus status) {
    -           FullHttpResponse response = new DefaultFullHttpResponse(
    -                           HTTP_1_1, status, 
Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
    -           response.headers().set(CONTENT_TYPE, "text/plain; 
charset=UTF-8");
    -
    -           // close the connection as soon as the error message is sent.
    -           
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    -   }
    -
    -   /**
    -    * Send the "304 Not Modified" response. This response can be used when 
the
    -    * file timestamp is the same as what the browser is sending up.
    -    *
    -    * @param ctx The channel context to write the response to.
    -    */
    -   private static void sendNotModified(ChannelHandlerContext ctx) {
    -           FullHttpResponse response = new 
DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);
    -           setDateHeader(response);
    -
    -           // close the connection as soon as the error message is sent.
    -           
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    -   }
    -
    -   /**
    -    * Sets the "date" header for the HTTP response.
    -    *
    -    * @param response HTTP response
    -    */
    -   private static void setDateHeader(FullHttpResponse response) {
    -           SimpleDateFormat dateFormatter = new 
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
    -           dateFormatter.setTimeZone(GMT_TIMEZONE);
    -
    -           Calendar time = new GregorianCalendar();
    -           response.headers().set(DATE, 
dateFormatter.format(time.getTime()));
    -   }
    -
    -   /**
    -    * Sets the "date" and "cache" headers for the HTTP Response.
    -    *
    -    * @param response    The HTTP response object.
    -    * @param fileToCache File to extract the modification timestamp from.
    -    */
    -   private static void setDateAndCacheHeaders(HttpResponse response, File 
fileToCache) {
    -           SimpleDateFormat dateFormatter = new 
SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
    -           dateFormatter.setTimeZone(GMT_TIMEZONE);
    -
    -           // date header
    -           Calendar time = new GregorianCalendar();
    -           response.headers().set(DATE, 
dateFormatter.format(time.getTime()));
    -
    -           // cache headers
    -           time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
    -           response.headers().set(EXPIRES, 
dateFormatter.format(time.getTime()));
    -           response.headers().set(CACHE_CONTROL, "private, max-age=" + 
HTTP_CACHE_SECONDS);
    -           response.headers().set(LAST_MODIFIED, dateFormatter.format(new 
Date(fileToCache.lastModified())));
    -   }
    -
    -   /**
    -    * Sets the content type header for the HTTP Response.
    -    *
    -    * @param response HTTP response
    -    * @param file     file to extract content type
    -    */
    -   private static void setContentTypeHeader(HttpResponse response, File 
file) {
    -           String mimeType = 
MimeTypes.getMimeTypeForFileName(file.getName());
    -           String mimeFinal = mimeType != null ? mimeType : 
MimeTypes.getDefaultMimeType();
    -           response.headers().set(CONTENT_TYPE, mimeFinal);
    +   protected boolean shouldCache(String requestPath) {
    +           return !(requestPath.contains("log") || 
requestPath.contains("out"));
    --- End diff --
    
    Should we make this more explicit in order to prevent accidental 
non-caching of requests that contain out or log for another reason?


> Create a Flink History Server
> -----------------------------
>
>                 Key: FLINK-1579
>                 URL: https://issues.apache.org/jira/browse/FLINK-1579
>             Project: Flink
>          Issue Type: New Feature
>          Components: Distributed Coordination
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Assignee: Chesnay Schepler
>
> Right now its not possible to analyze the job results for jobs that ran on 
> YARN, because we'll loose the information once the JobManager has stopped.
> Therefore, I propose to implement a "Flink History Server" which serves  the 
> results from these jobs.
> I haven't started thinking about the implementation, but I suspect it 
> involves some JSON files stored in HDFS :)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to