Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6147#discussion_r195075314
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
executor);
}
+ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters,
R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
+ String targetAddress,
+ int targetPort,
+ M messageHeaders,
+ U messageParameters,
+ R request,
+ Collection<Path> jars,
+ Collection<Path> userArtifacts) throws IOException {
+ Preconditions.checkNotNull(targetAddress);
+ Preconditions.checkArgument(0 <= targetPort && targetPort <
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+ Preconditions.checkNotNull(messageHeaders);
+ Preconditions.checkNotNull(request);
+ Preconditions.checkNotNull(messageParameters);
+ Preconditions.checkState(messageParameters.isResolved(),
"Message parameters were not resolved.");
+
+ String targetUrl =
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(),
messageParameters);
+
+ LOG.debug("Sending request of class {} to {}:{}{}",
request.getClass(), targetAddress, targetPort, targetUrl);
+ // serialize payload
+ StringWriter sw = new StringWriter();
+ objectMapper.writeValue(sw, request);
+ ByteBuf payload =
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+ // do not load file into memory, this can have weird
side-effects and break functionality
+ HttpDataFactory factory = new DefaultHttpDataFactory(true);
+
+ HttpRequest httpRequest = new
DefaultHttpRequest(HttpVersion.HTTP_1_1,
messageHeaders.getHttpMethod().getNettyHttpMethod(),
messageHeaders.getTargetRestEndpointURL());
+ httpRequest.headers()
+ .set(HttpHeaders.Names.HOST, targetAddress + ':' +
targetPort)
+ .set(HttpHeaders.Names.CONNECTION,
HttpHeaders.Values.CLOSE);
+
+ // takes care of splitting the request into multiple parts
+ HttpPostRequestEncoder bodyRequestEncoder;
+ try {
+ bodyRequestEncoder = new
HttpPostRequestEncoder(factory, httpRequest, true);
+
+ Attribute requestAttribute = new
MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+ requestAttribute.setContent(payload);
+ bodyRequestEncoder.addBodyHttpData(requestAttribute);
+
+ addPathsToEncoder(jars,
FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE,
bodyRequestEncoder);
+ addPathsToEncoder(userArtifacts,
FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE,
bodyRequestEncoder);
+
+ bodyRequestEncoder.finalizeRequest();
--- End diff --
If it's not a multi-part request, then we should send the `HttpRequest`
which is returned here.
---