This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9546f8243a24e7b45582b6de6702f819f1d73f97
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Thu Aug 17 10:46:54 2023 +0200

    [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException
    
    This _probably_ happens when a non-empty http content is received that does 
not contain any attribute data.
---
 .../flink/runtime/rest/FileUploadHandler.java      | 12 ++++-
 .../runtime/rest/FileUploadHandlerITCase.java      | 56 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index c3b797bcf72..c9e1fd78d74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -148,7 +148,7 @@ public class FileUploadHandler extends 
SimpleChannelInboundHandler<HttpObject> {
                 currentHttpPostRequestDecoder.offer(httpContent);
 
                 while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT
-                        && currentHttpPostRequestDecoder.hasNext()) {
+                        && hasNext(currentHttpPostRequestDecoder)) {
                     final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
                     if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
                         final DiskFileUpload fileUpload = (DiskFileUpload) 
data;
@@ -212,6 +212,16 @@ public class FileUploadHandler extends 
SimpleChannelInboundHandler<HttpObject> {
         }
     }
 
+    private static boolean hasNext(HttpPostRequestDecoder decoder) {
+        try {
+            return decoder.hasNext();
+        } catch (HttpPostRequestDecoder.EndOfDataDecoderException e) {
+            // this can occur if the final chuck wasn't empty, but didn't 
contain any attribute data
+            // unfortunately the Netty APIs don't give us any way to check this
+            return false;
+        }
+    }
+
     private void handleError(
             ChannelHandlerContext ctx,
             String errorMessage,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
index 75879a9cea7..45d14f1a444 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.function.BiConsumerWithException;
 
@@ -42,10 +43,12 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.io.StringWriter;
 import java.lang.reflect.Field;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -123,6 +126,15 @@ class FileUploadHandlerITCase {
         return finalizeRequest(builder, headerUrl);
     }
 
+    private Request buildMixedRequest(
+            String headerUrl, MultipartUploadExtension.TestRequestBody json, 
File file)
+            throws IOException {
+        MultipartBody.Builder builder = new MultipartBody.Builder();
+        builder = addJsonPart(builder, json, 
FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+        builder = addFilePart(builder, file, file.getName());
+        return finalizeRequest(builder, headerUrl);
+    }
+
     private Request buildMixedRequest(
             String headerUrl, MultipartUploadExtension.TestRequestBody json) 
throws IOException {
         MultipartBody.Builder builder = new MultipartBody.Builder();
@@ -227,6 +239,50 @@ class FileUploadHandlerITCase {
         verifyNoFileIsRegisteredToDeleteOnExitHook();
     }
 
+    /**
+     * This test checks for a specific multipart request chunk layout using a 
magic number.
+     *
+     * <p>These things are very susceptible to interference from other 
requests or parts of the
+     * payload; for example if the JSON payload increases by a single byte it 
can already break the
+     * number. Do not reuse the client.
+     *
+     * <p>To find the magic number you can define a static counter, and loop 
the test in the IDE
+     * (without forking!) while incrementing the counter on each run.
+     */
+    @Test
+    void testMixedMultipartEndOfDataDecoderExceptionHandling(@TempDir Path 
tmp) throws Exception {
+        OkHttpClient client = createOkHttpClientWithNoTimeouts();
+
+        MultipartUploadExtension.MultipartMixedHandler mixedHandler =
+                
multipartUpdateExtensionWrapper.getCustomExtension().getMixedHandler();
+
+        MultipartUploadExtension.TestRequestBody json =
+                new MultipartUploadExtension.TestRequestBody();
+
+        File file = TempDirUtils.newFile(tmp);
+        try (RandomAccessFile rw = new RandomAccessFile(file, "rw")) {
+            // magic value that reliably reproduced EndOfDataDecoderException 
in hasNext()
+            rw.setLength(1424);
+        }
+        multipartUpdateExtensionWrapper
+                .getCustomExtension()
+                .setFileUploadVerifier(
+                        (handlerRequest, restfulGateway) ->
+                                
MultipartUploadExtension.assertUploadedFilesEqual(
+                                        handlerRequest, 
Collections.singleton(file)));
+
+        Request singleFileMixedRequest =
+                buildMixedRequest(
+                        
mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), json, file);
+        try (Response response = 
client.newCall(singleFileMixedRequest).execute()) {
+            assertThat(response.code())
+                    
.isEqualTo(mixedHandler.getMessageHeaders().getResponseStatusCode().code());
+            assertThat(mixedHandler.lastReceivedRequest).isEqualTo(json);
+        }
+
+        verifyNoFileIsRegisteredToDeleteOnExitHook();
+    }
+
     @Test
     void testJsonMultipart() throws Exception {
         OkHttpClient client = createOkHttpClientWithNoTimeouts();

Reply via email to