zentol closed pull request #6650: [1.6][FLINK-10115][rest] Ignore
content-length limit for FileUploads
URL: https://github.com/apache/flink/pull/6650
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index 0d8605abc99..3d1ec9d0066 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -145,6 +145,7 @@ protected void respondAsLeader(ChannelHandlerContext ctx,
RoutedRequest routedRe
hre);
}
+ log.trace("Starting request processing.");
CompletableFuture<Void> requestProcessingFuture =
respondToRequest(
ctx,
httpRequest,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index d6287507697..7c46af04b55 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -21,6 +21,7 @@
import org.apache.flink.runtime.rest.handler.FileUploads;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.util.FileUtils;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -29,6 +30,7 @@
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
import
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
@@ -94,6 +96,7 @@ protected void channelRead0(final ChannelHandlerContext ctx,
final HttpObject ms
LOG.trace("Received request. URL:{} Method:{}",
httpRequest.getUri(), httpRequest.getMethod());
if
(httpRequest.getMethod().equals(HttpMethod.POST)) {
if
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
+ LOG.trace("Initializing
multipart file upload.");
checkState(currentHttpPostRequestDecoder == null);
checkState(currentHttpRequest
== null);
checkState(currentUploadDir ==
null);
@@ -107,6 +110,7 @@ protected void channelRead0(final ChannelHandlerContext
ctx, final HttpObject ms
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
}
} else if (msg instanceof HttpContent &&
currentHttpPostRequestDecoder != null) {
+ LOG.trace("Received http content.");
// make sure that we still have a upload dir in
case that it got deleted in the meanwhile
RestServerEndpoint.createUploadDir(uploadDir,
LOG);
@@ -121,9 +125,11 @@ protected void channelRead0(final ChannelHandlerContext
ctx, final HttpObject ms
final Path dest =
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
+ LOG.trace("Upload of file {}
complete.", fileUpload.getFilename());
} else if (data.getHttpDataType() ==
InterfaceHttpData.HttpDataType.Attribute) {
final Attribute request =
(Attribute) data;
// this could also be
implemented by using the first found Attribute as the payload
+ LOG.trace("Upload of attribute
{} complete.", request.getName());
if
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
currentJsonPayload =
request.get();
} else {
@@ -134,12 +140,18 @@ protected void channelRead0(final ChannelHandlerContext
ctx, final HttpObject ms
}
if (httpContent instanceof LastHttpContent) {
+ LOG.trace("Finalizing multipart file
upload.");
ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(currentUploadDir));
- ctx.fireChannelRead(currentHttpRequest);
if (currentJsonPayload != null) {
+
currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH,
currentJsonPayload.length);
+
currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_TYPE,
RestConstants.REST_CONTENT_TYPE);
+
ctx.fireChannelRead(currentHttpRequest);
ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload)));
} else {
-
ctx.fireChannelRead(ReferenceCountUtil.retain(httpContent));
+
currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
+
currentHttpRequest.headers().remove(HttpHeaders.Names.CONTENT_TYPE);
+
ctx.fireChannelRead(currentHttpRequest);
+
ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
}
reset();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 0153d5dd31a..c350393371a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -102,6 +102,8 @@ public void before() throws Exception {
Configuration config = new Configuration();
config.setInteger(RestOptions.PORT, 0);
config.setString(RestOptions.ADDRESS, "localhost");
+ // set this to a lower value on purpose to test that files
larger than the content limit are still accepted
+ config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, 1024 *
1024);
configuredUploadDir = temporaryFolder.newFolder().toPath();
config.setString(WebOptions.UPLOAD_DIR,
configuredUploadDir.toString());
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services