Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196868266
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
    @@ -0,0 +1,483 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.configuration.WebOptions;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.TestLogger;
    +
    +import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import okhttp3.MediaType;
    +import okhttp3.MultipartBody;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Request;
    +import okhttp3.Response;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.CompletableFuture;
    +
    +import static java.util.Objects.requireNonNull;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
    + * handled.
    + */
    +public class FileUploadHandlerTest extends TestLogger {
    +
    +   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
    +   private static final Random RANDOM = new Random();
    +
    +   @ClassRule
    +   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
    +
    +   private static RestServerEndpoint serverEndpoint;
    +   private static String serverAddress;
    +
    +   private static MultipartMixedHandler mixedHandler;
    +   private static MultipartJsonHandler jsonHandler;
    +   private static MultipartFileHandler fileHandler;
    +   private static File file1;
    +   private static File file2;
    +
    +   @BeforeClass
    +   public static void setup() throws Exception {
    +           Configuration config = new Configuration();
    +           config.setInteger(RestOptions.PORT, 0);
    +           config.setString(RestOptions.ADDRESS, "localhost");
    +           config.setString(WebOptions.UPLOAD_DIR, 
TEMPORARY_FOLDER.newFolder().getCanonicalPath());
    +
    +           RestServerEndpointConfiguration serverConfig = 
RestServerEndpointConfiguration.fromConfiguration(config);
    +
    +           final String restAddress = "http://localhost:1234";;
    +           RestfulGateway mockRestfulGateway = 
TestingRestfulGateway.newBuilder()
    +                   .setRestAddress(restAddress)
    +                   .build();
    +
    +           final GatewayRetriever<RestfulGateway> mockGatewayRetriever = 
() ->
    +                   CompletableFuture.completedFuture(mockRestfulGateway);
    +
    +           file1 = TEMPORARY_FOLDER.newFile();
    +           Files.write(file1.toPath(), 
"hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +           file2 = TEMPORARY_FOLDER.newFile();
    +           Files.write(file2.toPath(), 
"world".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +
    +           mixedHandler = new 
MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), 
mockGatewayRetriever);
    +           jsonHandler = new 
MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), 
mockGatewayRetriever);
    +           fileHandler = new 
MultipartFileHandler(CompletableFuture.completedFuture(restAddress), 
mockGatewayRetriever);
    +
    +           final List<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers = Arrays.asList(
    +                   Tuple2.of(mixedHandler.getMessageHeaders(), 
mixedHandler),
    +                   Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler),
    +                   Tuple2.of(fileHandler.getMessageHeaders(), 
fileHandler));
    +
    +           serverEndpoint = new TestRestServerEndpoint(serverConfig, 
handlers);
    +
    +           serverEndpoint.start();
    +           serverAddress = serverEndpoint.getRestBaseUrl();
    +   }
    +
    +   @AfterClass
    +   public static void teardown() throws Exception {
    +           if (serverEndpoint != null) {
    +                   serverEndpoint.close();
    +                   serverEndpoint = null;
    +           }
    +   }
    +
    +   private static Request buildFileRequest(String headerUrl) {
    +           MultipartBody.Builder builder = new MultipartBody.Builder();
    +           builder = addFilePart(builder);
    +           return finalizeRequest(builder, headerUrl);
    +   }
    +
    +   private static Request buildJsonRequest(String headerUrl, int index) 
throws IOException {
    +           MultipartBody.Builder builder = new MultipartBody.Builder();
    +           builder = addJsonPart(builder, index);
    +           return finalizeRequest(builder, headerUrl);
    +   }
    +
    +   private static Request buildMixedRequest(String headerUrl, int index) 
throws IOException {
    +           MultipartBody.Builder builder = new MultipartBody.Builder();
    +           builder = addJsonPart(builder, index);
    +           builder = addFilePart(builder);
    +           return finalizeRequest(builder, headerUrl);
    +   }
    +
    +   private static Request finalizeRequest(MultipartBody.Builder builder, 
String headerUrl) {
    +           MultipartBody multipartBody = builder
    +                   .setType(MultipartBody.FORM)
    +                   .build();
    +
    +           return new Request.Builder()
    +                   .url(serverAddress + headerUrl)
    +                   .post(multipartBody)
    +                   .build();
    +   }
    +
    +   private static MultipartBody.Builder addFilePart(MultipartBody.Builder 
builder) {
    +           okhttp3.RequestBody filePayload1 = 
okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file1);
    +           okhttp3.RequestBody filePayload2 = 
okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file2);
    +
    +           return builder.addFormDataPart("file1", file1.getName(), 
filePayload1)
    +                   .addFormDataPart("file2", file2.getName(), 
filePayload2);
    +   }
    +
    +   private static MultipartBody.Builder addJsonPart(MultipartBody.Builder 
builder, int index) throws IOException {
    +           TestRequestBody jsonRequestBody = new TestRequestBody(index);
    +
    +           StringWriter sw = new StringWriter();
    +           OBJECT_MAPPER.writeValue(sw, jsonRequestBody);
    +
    +           String jsonPayload = sw.toString();
    +
    +           return 
builder.addFormDataPart(org.apache.flink.runtime.rest.FileUploadHandler.HTTP_ATTRIBUTE_REQUEST,
 jsonPayload);
    +   }
    +
    +   @Test
    +   public void testMixedMultipart() throws Exception {
    +           OkHttpClient client = new OkHttpClient();
    +
    +           Request jsonRequest = 
buildJsonRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), 
RANDOM.nextInt());
    +           try (Response response = client.newCall(jsonRequest).execute()) 
{
    +                   // explicitly rejected by the test handler 
implementation
    +                   
assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
    +           }
    +
    +           Request fileRequest = 
buildFileRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
    +           try (Response response = client.newCall(fileRequest).execute()) 
{
    +                   // expected JSON payload is missing
    +                   assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
    +           }
    +
    +           int mixedId = RANDOM.nextInt();
    +           Request mixedRequest = 
buildMixedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), 
mixedId);
    +           try (Response response = 
client.newCall(mixedRequest).execute()) {
    +                   
assertEquals(mixedHandler.getMessageHeaders().getResponseStatusCode().code(), 
response.code());
    +                   assertEquals(mixedId, 
mixedHandler.lastReceivedRequest.index);
    +           }
    +   }
    +
    +   @Test
    +   public void testJsonMultipart() throws Exception {
    +           OkHttpClient client = new OkHttpClient();
    +
    +           int jsonId = RANDOM.nextInt();
    +           Request jsonRequest = 
buildJsonRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), 
jsonId);
    +           try (Response response = client.newCall(jsonRequest).execute()) 
{
    +                   
assertEquals(jsonHandler.getMessageHeaders().getResponseStatusCode().code(), 
response.code());
    +                   assertEquals(jsonId, 
jsonHandler.lastReceivedRequest.index);
    +           }
    +
    +           Request fileRequest = 
buildFileRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL());
    +           try (Response response = client.newCall(fileRequest).execute()) 
{
    +                   // either because JSON payload is missing or 
FileUploads are outright forbidden
    +                   assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
    +           }
    +
    +           Request mixedRequest = 
buildMixedRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), 
RANDOM.nextInt());
    +           try (Response response = 
client.newCall(mixedRequest).execute()) {
    +                   // FileUploads are outright forbidden
    +                   assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
    +           }
    +   }
    +
    +   @Test
    +   public void testFileMultipart() throws Exception {
    +           OkHttpClient client = new OkHttpClient();
    +
    +           Request jsonRequest = 
buildJsonRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), 
RANDOM.nextInt());
    +           try (Response response = client.newCall(jsonRequest).execute()) 
{
    +                   // JSON payload did not match expected format
    +                   assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
    +           }
    +
    +           Request fileRequest = 
buildFileRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL());
    +           try (Response response = client.newCall(fileRequest).execute()) 
{
    +                   
assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), 
response.code());
    +           }
    +
    +           Request mixedRequest = 
buildMixedRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), 
RANDOM.nextInt());
    +           try (Response response = 
client.newCall(mixedRequest).execute()) {
    +                   // JSON payload did not match expected format
    +                   assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
    +           }
    +   }
    --- End diff --
    
    the underlying issue is that we don't have unit test for `AbstractHandler` 
and `AbstractRestHandler`.


---

Reply via email to