This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 994b18518fb1534f6a62491e5149770b8667f68e
Author: zentol <ches...@apache.org>
AuthorDate: Tue Aug 21 12:37:13 2018 +0200

    [FLINK-10115][rest] Ignore content-length limit for FileUploads
---
 .../java/org/apache/flink/runtime/rest/FileUploadHandler.java | 11 +++++++++--
 .../apache/flink/runtime/rest/MultipartUploadResource.java    |  2 ++
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 11ff00a..7c46af0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest;
 import org.apache.flink.runtime.rest.handler.FileUploads;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -29,6 +30,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
@@ -140,11 +142,16 @@ public class FileUploadHandler extends 
SimpleChannelInboundHandler<HttpObject> {
                                if (httpContent instanceof LastHttpContent) {
                                        LOG.trace("Finalizing multipart file 
upload.");
                                        
ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(currentUploadDir));
-                                       ctx.fireChannelRead(currentHttpRequest);
                                        if (currentJsonPayload != null) {
+                                               
currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
currentJsonPayload.length);
+                                               
currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
RestConstants.REST_CONTENT_TYPE);
+                                               
ctx.fireChannelRead(currentHttpRequest);
                                                
ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload)));
                                        } else {
-                                               
ctx.fireChannelRead(ReferenceCountUtil.retain(httpContent));
+                                               
currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
+                                               
currentHttpRequest.headers().remove(HttpHeaders.Names.CONTENT_TYPE);
+                                               
ctx.fireChannelRead(currentHttpRequest);
+                                               
ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
                                        }
                                        reset();
                                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 0153d5d..c350393 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -102,6 +102,8 @@ public class MultipartUploadResource extends 
ExternalResource {
                Configuration config = new Configuration();
                config.setInteger(RestOptions.PORT, 0);
                config.setString(RestOptions.ADDRESS, "localhost");
+               // set this to a lower value on purpose to test that files 
larger than the content limit are still accepted
+               config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, 1024 * 
1024);
                configuredUploadDir = temporaryFolder.newFolder().toPath();
                config.setString(WebOptions.UPLOAD_DIR, 
configuredUploadDir.toString());
 

Reply via email to