This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 69b6e60da295a47eb449811c64c8a19733b27673 Author: Chesnay Schepler <[email protected]> AuthorDate: Thu Sep 9 16:54:16 2021 +0200 [FLINK-24197] Guard against CLRF being split across chunks This commit adds a defense mechanism against https://github.com/netty/netty/issues/11668. If the CRLF prefix of a multipart delimiter is split across 2 chunks an exception is thrown because Netty incorrectly treats CR as data. If CR is the last byte of the current chunk, then we pessimistically assume that this case occurred. We exclude the trailing CR from the current chunk, and add it to the front of the next received chunk. --- .../flink/runtime/rest/FileUploadHandler.java | 32 +++++++++++++++++++++- .../runtime/rest/MultipartUploadResource.java | 8 +++--- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java index 8dacb09..3367ab1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java @@ -24,11 +24,13 @@ import org.apache.flink.runtime.rest.messages.ErrorResponseBody; import org.apache.flink.runtime.rest.util.RestConstants; import org.apache.flink.util.FileUtils; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpConstants; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; @@ -85,6 +87,8 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> { private byte[] currentJsonPayload; private Path currentUploadDir; + private boolean addCRPrefix = false; + public FileUploadHandler(final Path uploadDir) { super(true); @@ -142,7 +146,33 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> { // meanwhile RestServerEndpoint.createUploadDir(uploadDir, LOG, false); - final HttpContent httpContent = (HttpContent) msg; + HttpContent httpContent = (HttpContent) msg; + final ByteBuf content = httpContent.content(); + + // the following is a defense mechanism against + // https://github.com/netty/netty/issues/11668 + // if the CRLF prefix of a multipart delimiter is split across 2 chunks an exception + // is thrown + // if CR is the last byte of the current chunk, then we pessimistically assume that + // this case occurred + // exclude the trailing CR from the current chunk, and add it to the front of the + // next received chunk + if (addCRPrefix) { + final byte[] contentWithLeadingCR = new byte[1 + content.readableBytes()]; + contentWithLeadingCR[0] = HttpConstants.CR; + content.getBytes(0, contentWithLeadingCR, 1, content.readableBytes()); + + httpContent = httpContent.replace(Unpooled.wrappedBuffer(contentWithLeadingCR)); + addCRPrefix = false; + } else { + if (content.writerIndex() > 0 + && content.getByte(content.writerIndex() - 1) == HttpConstants.CR) { + // we may run into https://github.com/netty/netty/issues/11668 + // hide CR from this chunk, add it to the next received chunk + content.writerIndex(content.writerIndex() - 1); + addCRPrefix = true; + } + } currentHttpPostRequestDecoder.offer(httpContent); while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java index 2cd6ecf..c3f8919 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java @@ -62,7 +62,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Objects; -import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -112,7 +111,8 @@ public class MultipartUploadResource extends ExternalResource { file1 = temporaryFolder.newFile(); try (RandomAccessFile rw = new RandomAccessFile(file1, "rw")) { - rw.setLength(1024 * 1024 * 64); + // magic value that reliably reproduced https://github.com/netty/netty/issues/11668 + rw.setLength(5043444); } file2 = temporaryFolder.newFile(); Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET)); @@ -455,13 +455,13 @@ public class MultipartUploadResource extends ExternalResource { /** Simple test {@link RequestBody}. */ protected static final class TestRequestBody implements RequestBody { private static final String FIELD_NAME_INDEX = "index"; - private static final Random RANDOM = new Random(); @JsonProperty(FIELD_NAME_INDEX) private final int index; TestRequestBody() { - this(RANDOM.nextInt()); + // magic value that reliably reproduced https://github.com/netty/netty/issues/11668 + this(-766974635); } @JsonCreator
