[
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517493#comment-16517493
]
ASF GitHub Bot commented on FLINK-9599:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6178#discussion_r196560163
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
+
+ private static final ObjectMapper OBJECT_MAPPER =
RestMapperUtils.getStrictObjectMapper();
+ private static final Random RANDOM = new Random();
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ private static RestServerEndpoint serverEndpoint;
+ private static String serverAddress;
+
+ private static MultipartMixedHandler mixedHandler;
+ private static MultipartJsonHandler jsonHandler;
+ private static MultipartFileHandler fileHandler;
+ private static File file1;
+ private static File file2;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ Configuration config = new Configuration();
+ config.setInteger(RestOptions.PORT, 0);
+ config.setString(RestOptions.ADDRESS, "localhost");
+ config.setString(WebOptions.UPLOAD_DIR,
TEMPORARY_FOLDER.newFolder().getCanonicalPath());
+
+ RestServerEndpointConfiguration serverConfig =
RestServerEndpointConfiguration.fromConfiguration(config);
+
+ final String restAddress = "http://localhost:1234";
+ RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
+
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+
+ final GatewayRetriever<RestfulGateway> mockGatewayRetriever =
() ->
+ CompletableFuture.completedFuture(mockRestfulGateway);
+
+ file1 = TEMPORARY_FOLDER.newFile();
+ Files.write(file1.toPath(),
"hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
+ file2 = TEMPORARY_FOLDER.newFile();
+ Files.write(file2.toPath(),
"world".getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+ mixedHandler = new
MultipartMixedHandler(CompletableFuture.completedFuture(restAddress),
mockGatewayRetriever);
+ jsonHandler = new
MultipartJsonHandler(CompletableFuture.completedFuture(restAddress),
mockGatewayRetriever);
+ fileHandler = new
MultipartFileHandler(CompletableFuture.completedFuture(restAddress),
mockGatewayRetriever);
+
+ final List<Tuple2<RestHandlerSpecification,
ChannelInboundHandler>> handlers = Arrays.asList(
+ Tuple2.of(mixedHandler.getMessageHeaders(),
mixedHandler),
+ Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler),
+ Tuple2.of(fileHandler.getMessageHeaders(),
fileHandler));
+
+ serverEndpoint = new TestRestServerEndpoint(serverConfig,
handlers);
+
+ serverEndpoint.start();
+ serverAddress = serverEndpoint.getRestBaseUrl();
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ if (serverEndpoint != null) {
+ serverEndpoint.close();
+ serverEndpoint = null;
+ }
+ }
+
+ private static Request buildRequest(String headerUrl, int index,
boolean includeFile, boolean includeJson) throws IOException {
+ Preconditions.checkArgument(includeFile || includeJson, "You
have to either include JSON or a file.");
+ MultipartBody.Builder builder = new MultipartBody.Builder();
+
+ if (includeFile) {
+ okhttp3.RequestBody filePayload1 =
okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file1);
+ builder = builder.addFormDataPart("file1",
file1.getName(), filePayload1);
+
+ okhttp3.RequestBody filePayload2 =
okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file2);
+ builder = builder.addFormDataPart("file2",
file2.getName(), filePayload2);
+ }
+
+ if (includeJson) {
+ TestRequestBody jsonRequestBody = new
TestRequestBody(index);
+
+ StringWriter sw = new StringWriter();
+ OBJECT_MAPPER.writeValue(sw, jsonRequestBody);
+
+ String jsonPayload = sw.toString();
+
+ builder =
builder.addFormDataPart(org.apache.flink.runtime.rest.FileUploadHandler.HTTP_ATTRIBUTE_REQUEST,
jsonPayload);
+ }
+
+ MultipartBody multipartBody = builder
+ .setType(MultipartBody.FORM)
+ .build();
+
+ return new Request.Builder()
+ .url(serverAddress + headerUrl)
+ .post(multipartBody)
+ .build();
+ }
+
+ @Test
+ public void testMixedMultipart() throws Exception {
+ OkHttpClient client = new OkHttpClient();
+
+ Request jsonRequest =
buildRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(),
RANDOM.nextInt(), false, true);
--- End diff --
Would be good to have methods `buildFileRequest` and `buildJsonRequest` and
`buildJsonFileRequest` instead of the version with boolean. Booleans make it
harder to understand which request is generated.
> Implement generic mechanism to receive files via rest
> -----------------------------------------------------
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
> Issue Type: New Feature
> Components: REST
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
> * extend the RestClient to allow the upload of Files
> * extend FileUploadHandler to accept mixed multi-part requests (json + files)
> * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute,
> similar to the existing special case for the {{JarUploadHandler}}. The JSON
> body can be forwarded by replacing the incoming http requests with a simple
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)