Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6147#discussion_r195101162
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
---
@@ -91,19 +111,49 @@ protected void channelRead0(final
ChannelHandlerContext ctx, final HttpObject ms
while (httpContent !=
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
final InterfaceHttpData data =
currentHttpPostRequestDecoder.next();
- if (data.getHttpDataType() ==
InterfaceHttpData.HttpDataType.FileUpload) {
- final DiskFileUpload fileUpload =
(DiskFileUpload) data;
- checkState(fileUpload.isCompleted());
-
- final Path dest =
uploadDir.resolve(Paths.get(UUID.randomUUID() +
- "_" +
fileUpload.getFilename()));
- fileUpload.renameTo(dest.toFile());
-
ctx.channel().attr(UPLOADED_FILE).set(dest);
+ if
(currentHttpRequest.getUri().equals(JobSubmitHeaders.getInstance().getTargetRestEndpointURL()))
{
--- End diff --
I think the `FileUploadHandler` should not know about the
`JobSubmitHandler`. Instead it should only be responsible for receiving
uploaded files, storing them in a temp directory and then making them
accessible to a downstream handler (e.g. through an `Attribute` in the
`AttributeMap`). In order to defer the deserialization of the Json part of the
payload, we could create a new `HttpRequest` which contains exactly the data
sent as a `MemoryAttribute` (the branch which matches
`InterfaceHttpData.HttpDataType.Attribute`).
---