[
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518310#comment-16518310
]
ASF GitHub Bot commented on FLINK-9599:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6178#discussion_r196836575
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
---
@@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) {
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final
HttpObject msg) throws Exception {
- if (msg instanceof HttpRequest) {
- final HttpRequest httpRequest = (HttpRequest) msg;
- if (httpRequest.getMethod().equals(HttpMethod.POST)) {
- if
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
- currentHttpPostRequestDecoder = new
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
- currentHttpRequest = httpRequest;
+ try {
+ if (msg instanceof HttpRequest) {
+ final HttpRequest httpRequest = (HttpRequest)
msg;
+ LOG.trace("Received request. URL:{} Method:{}",
httpRequest.getUri(), httpRequest.getMethod());
+ if
(httpRequest.getMethod().equals(HttpMethod.POST)) {
+ if
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
+ currentHttpPostRequestDecoder =
new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
+ currentHttpRequest =
httpRequest;
+ currentUploadDir =
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
+ } else {
+ ctx.fireChannelRead(msg);
+ }
} else {
ctx.fireChannelRead(msg);
}
+ } else if (msg instanceof HttpContent &&
currentHttpPostRequestDecoder != null) {
+ // make sure that we still have a upload dir in
case that it got deleted in the meanwhile
+ RestServerEndpoint.createUploadDir(uploadDir,
LOG);
+
+ final HttpContent httpContent = (HttpContent)
msg;
+
currentHttpPostRequestDecoder.offer(httpContent);
+
+ while (currentHttpPostRequestDecoder.hasNext())
{
+ final InterfaceHttpData data =
currentHttpPostRequestDecoder.next();
+ if (data.getHttpDataType() ==
InterfaceHttpData.HttpDataType.FileUpload) {
+ final DiskFileUpload fileUpload
= (DiskFileUpload) data;
+
checkState(fileUpload.isCompleted());
+
+ final Path dest =
currentUploadDir.resolve(fileUpload.getFilename());
+
fileUpload.renameTo(dest.toFile());
+ } else if (data.getHttpDataType() ==
InterfaceHttpData.HttpDataType.Attribute) {
+ final Attribute request =
(Attribute) data;
+ // this could also be
implemented by using the first found Attribute as the payload
+ if
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+ currentJsonPayload =
request.get();
+ } else {
+ LOG.warn("Received
unknown attribute {}.", data.getName());
+
HandlerUtils.sendErrorResponse(
+ ctx,
+
currentHttpRequest,
+ new
ErrorResponseBody("Received unknown attribute " + data.getName() + '.'),
+
HttpResponseStatus.BAD_REQUEST,
+
Collections.emptyMap()
+ );
+ deleteUploadedFiles();
+ reset();
+ return;
+ }
+ }
+ }
+
+ if (httpContent instanceof LastHttpContent) {
+
ctx.channel().attr(UPLOADED_FILES).set(new
FileUploads(Collections.singleton(currentUploadDir)));
+ ctx.fireChannelRead(currentHttpRequest);
+ if (currentJsonPayload != null) {
+
ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload)));
+ } else {
+
ctx.fireChannelRead(httpContent);
+ }
+ reset();
+ }
} else {
ctx.fireChannelRead(msg);
}
- } else if (msg instanceof HttpContent &&
currentHttpPostRequestDecoder != null) {
- // make sure that we still have a upload dir in case
that it got deleted in the meanwhile
- RestServerEndpoint.createUploadDir(uploadDir, LOG);
-
- final HttpContent httpContent = (HttpContent) msg;
- currentHttpPostRequestDecoder.offer(httpContent);
-
- while (currentHttpPostRequestDecoder.hasNext()) {
- final InterfaceHttpData data =
currentHttpPostRequestDecoder.next();
- if (data.getHttpDataType() ==
InterfaceHttpData.HttpDataType.FileUpload) {
- final DiskFileUpload fileUpload =
(DiskFileUpload) data;
- checkState(fileUpload.isCompleted());
-
- final Path dest =
uploadDir.resolve(Paths.get(UUID.randomUUID() +
- "_" +
fileUpload.getFilename()));
- fileUpload.renameTo(dest.toFile());
-
ctx.channel().attr(UPLOADED_FILE).set(dest);
- }
- }
+ } catch (Exception e) {
+ LOG.warn("Internal server error. File upload failed.",
e);
+ HandlerUtils.sendErrorResponse(
+ ctx,
+ currentHttpRequest,
+ new ErrorResponseBody("File upload failed."),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR,
+ Collections.emptyMap()
+ );
+ deleteUploadedFiles();
+ reset();
+ }
+ }
- if (httpContent instanceof LastHttpContent) {
- ctx.fireChannelRead(currentHttpRequest);
- ctx.fireChannelRead(httpContent);
- reset();
+ private void deleteUploadedFiles() {
+ if (currentUploadDir != null) {
+ try (FileUploads uploads = new
FileUploads(Collections.singleton(currentUploadDir))) {
--- End diff --
The idea was to define the cleanup logic in one place, in this case in the
`FileUploads` class.
> Implement generic mechanism to receive files via rest
> -----------------------------------------------------
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
> Issue Type: New Feature
> Components: REST
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
> * extend the RestClient to allow the upload of Files
> * extend FileUploadHandler to accept mixed multi-part requests (json + files)
> * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute,
> similar to the existing special case for the {{JarUploadHandler}}. The JSON
> body can be forwarded by replacing the incoming http requests with a simple
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)