This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 994b18518fb1534f6a62491e5149770b8667f68e Author: zentol <ches...@apache.org> AuthorDate: Tue Aug 21 12:37:13 2018 +0200 [FLINK-10115][rest] Ignore content-length limit for FileUploads --- .../java/org/apache/flink/runtime/rest/FileUploadHandler.java | 11 +++++++++-- .../apache/flink/runtime/rest/MultipartUploadResource.java | 2 ++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java index 11ff00a..7c46af0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest; import org.apache.flink.runtime.rest.handler.FileUploads; import org.apache.flink.runtime.rest.handler.util.HandlerUtils; import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.util.RestConstants; import org.apache.flink.util.FileUtils; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; @@ -29,6 +30,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; @@ -140,11 +142,16 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> { if (httpContent instanceof LastHttpContent) { LOG.trace("Finalizing multipart file upload."); ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(currentUploadDir)); - ctx.fireChannelRead(currentHttpRequest); if (currentJsonPayload != null) { + currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, currentJsonPayload.length); + currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE); + ctx.fireChannelRead(currentHttpRequest); ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload))); } else { - ctx.fireChannelRead(ReferenceCountUtil.retain(httpContent)); + currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0); + currentHttpRequest.headers().remove(HttpHeaders.Names.CONTENT_TYPE); + ctx.fireChannelRead(currentHttpRequest); + ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT); } reset(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java index 0153d5d..c350393 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java @@ -102,6 +102,8 @@ public class MultipartUploadResource extends ExternalResource { Configuration config = new Configuration(); config.setInteger(RestOptions.PORT, 0); config.setString(RestOptions.ADDRESS, "localhost"); + // set this to a lower value on purpose to test that files larger than the content limit are still accepted + config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, 1024 * 1024); configuredUploadDir = temporaryFolder.newFolder().toPath(); config.setString(WebOptions.UPLOAD_DIR, configuredUploadDir.toString());