Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6147#discussion_r195077673
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
executor);
}
+ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters,
R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
+ String targetAddress,
+ int targetPort,
+ M messageHeaders,
+ U messageParameters,
+ R request,
+ Collection<Path> jars,
+ Collection<Path> userArtifacts) throws IOException {
+ Preconditions.checkNotNull(targetAddress);
+ Preconditions.checkArgument(0 <= targetPort && targetPort <
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+ Preconditions.checkNotNull(messageHeaders);
+ Preconditions.checkNotNull(request);
+ Preconditions.checkNotNull(messageParameters);
+ Preconditions.checkState(messageParameters.isResolved(),
"Message parameters were not resolved.");
+
+ String targetUrl =
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(),
messageParameters);
+
+ LOG.debug("Sending request of class {} to {}:{}{}",
request.getClass(), targetAddress, targetPort, targetUrl);
+ // serialize payload
+ StringWriter sw = new StringWriter();
+ objectMapper.writeValue(sw, request);
+ ByteBuf payload =
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+ // do not load file into memory, this can have weird
side-effects and break functionality
+ HttpDataFactory factory = new DefaultHttpDataFactory(true);
+
+ HttpRequest httpRequest = new
DefaultHttpRequest(HttpVersion.HTTP_1_1,
messageHeaders.getHttpMethod().getNettyHttpMethod(),
messageHeaders.getTargetRestEndpointURL());
+ httpRequest.headers()
+ .set(HttpHeaders.Names.HOST, targetAddress + ':' +
targetPort)
+ .set(HttpHeaders.Names.CONNECTION,
HttpHeaders.Values.CLOSE);
+
+ // takes care of splitting the request into multiple parts
+ HttpPostRequestEncoder bodyRequestEncoder;
+ try {
+ bodyRequestEncoder = new
HttpPostRequestEncoder(factory, httpRequest, true);
+
+ Attribute requestAttribute = new
MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+ requestAttribute.setContent(payload);
+ bodyRequestEncoder.addBodyHttpData(requestAttribute);
+
+ addPathsToEncoder(jars,
FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE,
bodyRequestEncoder);
+ addPathsToEncoder(userArtifacts,
FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE,
bodyRequestEncoder);
--- End diff --
If we can send arbitrary files to the server and let the respective handler
make sense of what is in what file, then we would also not need to introduce
the different attributes.
---