This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2d3c142eb036f2cf65b0a4f81caddd7e4c943fd5 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Fri Aug 18 14:13:28 2023 +0200 [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException --- .../flink/runtime/rest/FileUploadHandler.java | 12 ++++- .../runtime/rest/FileUploadHandlerITCase.java | 58 ++++++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java index c6dda2cdfb2..d60f48d68ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java @@ -148,7 +148,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> { currentHttpPostRequestDecoder.offer(httpContent); while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT - && currentHttpPostRequestDecoder.hasNext()) { + && hasNext(currentHttpPostRequestDecoder)) { final InterfaceHttpData data = currentHttpPostRequestDecoder.next(); if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) { final DiskFileUpload fileUpload = (DiskFileUpload) data; @@ -214,6 +214,16 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> { } } + private static boolean hasNext(HttpPostRequestDecoder decoder) { + try { + return decoder.hasNext(); + } catch (HttpPostRequestDecoder.EndOfDataDecoderException e) { + // this can occur if the final chuck wasn't empty, but didn't contain any attribute data + // unfortunately the Netty APIs don't give us any way to check this + return false; + } + } + private void handleError( ChannelHandlerContext ctx, String errorMessage, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java index 34d17955922..480b58da26e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.util.RestMapperUtils; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.BiConsumerWithException; @@ -39,13 +40,16 @@ import okhttp3.Response; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.io.StringWriter; import java.lang.reflect.Field; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.LinkedHashSet; @@ -71,6 +75,8 @@ public class FileUploadHandlerITCase extends TestLogger { @Rule public final MultipartUploadResource multipartUpdateResource = new MultipartUploadResource(); + @Rule public final TemporaryFolder tmp = new TemporaryFolder(); + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); @ClassRule @@ -115,6 +121,15 @@ public class FileUploadHandlerITCase extends TestLogger { return finalizeRequest(builder, headerUrl); } + private Request buildMixedRequest( + String headerUrl, MultipartUploadResource.TestRequestBody json, File file) + throws IOException { + MultipartBody.Builder builder = new MultipartBody.Builder(); + builder = addJsonPart(builder, json, FileUploadHandler.HTTP_ATTRIBUTE_REQUEST); + builder = addFilePart(builder, file, file.getName()); + return finalizeRequest(builder, headerUrl); + } + private Request buildMixedRequest( String headerUrl, MultipartUploadResource.TestRequestBody json) throws IOException { MultipartBody.Builder builder = new MultipartBody.Builder(); @@ -219,6 +234,49 @@ public class FileUploadHandlerITCase extends TestLogger { verifyNoFileIsRegisteredToDeleteOnExitHook(); } + /** + * This test checks for a specific multipart request chunk layout using a magic number. + * + * <p>These things are very susceptible to interference from other requests or parts of the + * payload; for example if the JSON payload increases by a single byte it can already break the + * number. Do not reuse the client. + * + * <p>To find the magic number you can define a static counter, and loop the test in the IDE + * (without forking!) while incrementing the counter on each run. + */ + @Test + public void testMixedMultipartEndOfDataDecoderExceptionHandling() throws Exception { + OkHttpClient client = createOkHttpClientWithNoTimeouts(); + + MultipartUploadResource.MultipartMixedHandler mixedHandler = + multipartUpdateResource.getMixedHandler(); + + MultipartUploadResource.TestRequestBody json = + new MultipartUploadResource.TestRequestBody(); + + File file = TempDirUtils.newFile(tmp.newFolder().toPath()); + try (RandomAccessFile rw = new RandomAccessFile(file, "rw")) { + // magic value that reliably reproduced EndOfDataDecoderException in hasNext() + rw.setLength(1424); + } + multipartUpdateResource.setFileUploadVerifier( + (handlerRequest, restfulGateway) -> + MultipartUploadResource.assertUploadedFilesEqual( + handlerRequest, Collections.singleton(file))); + + Request singleFileMixedRequest = + buildMixedRequest( + mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), json, file); + try (Response response = client.newCall(singleFileMixedRequest).execute()) { + assertEquals( + mixedHandler.getMessageHeaders().getResponseStatusCode().code(), + response.code()); + assertEquals(json, mixedHandler.lastReceivedRequest); + } + + verifyNoFileIsRegisteredToDeleteOnExitHook(); + } + @Test public void testJsonMultipart() throws Exception { OkHttpClient client = createOkHttpClientWithNoTimeouts();