Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196740018
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
    @@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) {
     
        @Override
        protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
    -           if (msg instanceof HttpRequest) {
    -                   final HttpRequest httpRequest = (HttpRequest) msg;
    -                   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
    -                           if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
    -                                   currentHttpPostRequestDecoder = new 
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
    -                                   currentHttpRequest = httpRequest;
    +           try {
    +                   if (msg instanceof HttpRequest) {
    +                           final HttpRequest httpRequest = (HttpRequest) 
msg;
    +                           LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
    +                           if 
(httpRequest.getMethod().equals(HttpMethod.POST)) {
    +                                   if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
    +                                           currentHttpPostRequestDecoder = 
new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
    +                                           currentHttpRequest = 
httpRequest;
    +                                           currentUploadDir = 
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
    +                                   } else {
    +                                           ctx.fireChannelRead(msg);
    +                                   }
                                } else {
                                        ctx.fireChannelRead(msg);
                                }
    +                   } else if (msg instanceof HttpContent && 
currentHttpPostRequestDecoder != null) {
    +                           // make sure that we still have a upload dir in 
case that it got deleted in the meanwhile
    +                           RestServerEndpoint.createUploadDir(uploadDir, 
LOG);
    +
    +                           final HttpContent httpContent = (HttpContent) 
msg;
    +                           
currentHttpPostRequestDecoder.offer(httpContent);
    +
    +                           while (currentHttpPostRequestDecoder.hasNext()) 
{
    +                                   final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
    +                                   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
    +                                           final DiskFileUpload fileUpload 
= (DiskFileUpload) data;
    +                                           
checkState(fileUpload.isCompleted());
    +
    +                                           final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
    +                                           
fileUpload.renameTo(dest.toFile());
    +                                   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
    +                                           final Attribute request = 
(Attribute) data;
    +                                           // this could also be 
implemented by using the first found Attribute as the payload
    +                                           if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
    +                                                   currentJsonPayload = 
request.get();
    +                                           } else {
    +                                                   LOG.warn("Received 
unknown attribute {}.", data.getName());
    +                                                   
HandlerUtils.sendErrorResponse(
    +                                                           ctx,
    +                                                           
currentHttpRequest,
    +                                                           new 
ErrorResponseBody("Received unknown attribute " + data.getName() + '.'),
    +                                                           
HttpResponseStatus.BAD_REQUEST,
    +                                                           
Collections.emptyMap()
    +                                                   );
    +                                                   deleteUploadedFiles();
    +                                                   reset();
    +                                                   return;
    +                                           }
    +                                   }
    +                           }
    +
    +                           if (httpContent instanceof LastHttpContent) {
    +                                   
ctx.channel().attr(UPLOADED_FILES).set(new 
FileUploads(Collections.singleton(currentUploadDir)));
    +                                   ctx.fireChannelRead(currentHttpRequest);
    +                                   if (currentJsonPayload != null) {
    +                                           
ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload)));
    +                                   } else {
    +                                           
ctx.fireChannelRead(httpContent);
    +                                   }
    +                                   reset();
    +                           }
                        } else {
                                ctx.fireChannelRead(msg);
                        }
    -           } else if (msg instanceof HttpContent && 
currentHttpPostRequestDecoder != null) {
    -                   // make sure that we still have a upload dir in case 
that it got deleted in the meanwhile
    -                   RestServerEndpoint.createUploadDir(uploadDir, LOG);
    -
    -                   final HttpContent httpContent = (HttpContent) msg;
    -                   currentHttpPostRequestDecoder.offer(httpContent);
    -
    -                   while (currentHttpPostRequestDecoder.hasNext()) {
    -                           final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
    -                           if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
    -                                   final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
    -                                   checkState(fileUpload.isCompleted());
    -
    -                                   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
    -                                           "_" + 
fileUpload.getFilename()));
    -                                   fileUpload.renameTo(dest.toFile());
    -                                   
ctx.channel().attr(UPLOADED_FILE).set(dest);
    -                           }
    -                   }
    +           } catch (Exception e) {
    +                   LOG.warn("Internal server error. File upload failed.", 
e);
    +                   HandlerUtils.sendErrorResponse(
    +                           ctx,
    +                           currentHttpRequest,
    +                           new ErrorResponseBody("File upload failed."),
    +                           HttpResponseStatus.INTERNAL_SERVER_ERROR,
    +                           Collections.emptyMap()
    +                   );
    +                   deleteUploadedFiles();
    +                   reset();
    +           }
    +   }
     
    -                   if (httpContent instanceof LastHttpContent) {
    -                           ctx.fireChannelRead(currentHttpRequest);
    -                           ctx.fireChannelRead(httpContent);
    -                           reset();
    +   private void deleteUploadedFiles() {
    +           if (currentUploadDir != null) {
    +                   try (FileUploads uploads = new 
FileUploads(Collections.singleton(currentUploadDir))) {
    --- End diff --
    
    Why do we create a `FileUploads` instance instead of simply calling 
`FileUtils.deleteDirectory(currentUploadDir)`?


---

Reply via email to