This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69b6e60da295a47eb449811c64c8a19733b27673
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Sep 9 16:54:16 2021 +0200

    [FLINK-24197] Guard against CLRF being split across chunks
    
    This commit adds a defense mechanism against 
https://github.com/netty/netty/issues/11668.
    If the CRLF prefix of a multipart delimiter is split across 2 chunks an 
exception is thrown because Netty incorrectly treats CR as data.
    If CR is the last byte of the current chunk, then we pessimistically assume 
that this case occurred.
    We exclude the trailing CR from the current chunk, and add it to the front 
of the next received chunk.
---
 .../flink/runtime/rest/FileUploadHandler.java      | 32 +++++++++++++++++++++-
 .../runtime/rest/MultipartUploadResource.java      |  8 +++---
 2 files changed, 35 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 8dacb09..3367ab1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -24,11 +24,13 @@ import 
org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.util.FileUtils;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpConstants;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
@@ -85,6 +87,8 @@ public class FileUploadHandler extends 
SimpleChannelInboundHandler<HttpObject> {
     private byte[] currentJsonPayload;
     private Path currentUploadDir;
 
+    private boolean addCRPrefix = false;
+
     public FileUploadHandler(final Path uploadDir) {
         super(true);
 
@@ -142,7 +146,33 @@ public class FileUploadHandler extends 
SimpleChannelInboundHandler<HttpObject> {
                 // meanwhile
                 RestServerEndpoint.createUploadDir(uploadDir, LOG, false);
 
-                final HttpContent httpContent = (HttpContent) msg;
+                HttpContent httpContent = (HttpContent) msg;
+                final ByteBuf content = httpContent.content();
+
+                // the following is a defense mechanism against
+                // https://github.com/netty/netty/issues/11668
+                // if the CRLF prefix of a multipart delimiter is split across 
2 chunks an exception
+                // is thrown
+                // if CR is the last byte of the current chunk, then we 
pessimistically assume that
+                // this case occurred
+                // exclude the trailing CR from the current chunk, and add it 
to the front of the
+                // next received chunk
+                if (addCRPrefix) {
+                    final byte[] contentWithLeadingCR = new byte[1 + 
content.readableBytes()];
+                    contentWithLeadingCR[0] = HttpConstants.CR;
+                    content.getBytes(0, contentWithLeadingCR, 1, 
content.readableBytes());
+
+                    httpContent = 
httpContent.replace(Unpooled.wrappedBuffer(contentWithLeadingCR));
+                    addCRPrefix = false;
+                } else {
+                    if (content.writerIndex() > 0
+                            && content.getByte(content.writerIndex() - 1) == 
HttpConstants.CR) {
+                        // we may run into 
https://github.com/netty/netty/issues/11668
+                        // hide CR from this chunk, add it to the next 
received chunk
+                        content.writerIndex(content.writerIndex() - 1);
+                        addCRPrefix = true;
+                    }
+                }
                 currentHttpPostRequestDecoder.offer(httpContent);
 
                 while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 2cd6ecf..c3f8919 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -62,7 +62,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
-import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -112,7 +111,8 @@ public class MultipartUploadResource extends 
ExternalResource {
 
         file1 = temporaryFolder.newFile();
         try (RandomAccessFile rw = new RandomAccessFile(file1, "rw")) {
-            rw.setLength(1024 * 1024 * 64);
+            // magic value that reliably reproduced 
https://github.com/netty/netty/issues/11668
+            rw.setLength(5043444);
         }
         file2 = temporaryFolder.newFile();
         Files.write(file2.toPath(), 
"world".getBytes(ConfigConstants.DEFAULT_CHARSET));
@@ -455,13 +455,13 @@ public class MultipartUploadResource extends 
ExternalResource {
     /** Simple test {@link RequestBody}. */
     protected static final class TestRequestBody implements RequestBody {
         private static final String FIELD_NAME_INDEX = "index";
-        private static final Random RANDOM = new Random();
 
         @JsonProperty(FIELD_NAME_INDEX)
         private final int index;
 
         TestRequestBody() {
-            this(RANDOM.nextInt());
+            // magic value that reliably reproduced 
https://github.com/netty/netty/issues/11668
+            this(-766974635);
         }
 
         @JsonCreator

Reply via email to