Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6147#discussion_r195077673 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -197,6 +208,88 @@ public void shutdown(Time timeout) { executor); } + public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request, + Collection<Path> jars, + Collection<Path> userArtifacts) throws IOException { + Preconditions.checkNotNull(targetAddress); + Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536]."); + Preconditions.checkNotNull(messageHeaders); + Preconditions.checkNotNull(request); + Preconditions.checkNotNull(messageParameters); + Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved."); + + String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters); + + LOG.debug("Sending request of class {} to {}:{}{}", request.getClass(), targetAddress, targetPort, targetUrl); + // serialize payload + StringWriter sw = new StringWriter(); + objectMapper.writeValue(sw, request); + ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); + + // do not load file into memory, this can have weird side-effects and break functionality + HttpDataFactory factory = new DefaultHttpDataFactory(true); + + HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), messageHeaders.getTargetRestEndpointURL()); + httpRequest.headers() + .set(HttpHeaders.Names.HOST, targetAddress + ':' + targetPort) + .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + + // takes care of splitting the request into multiple parts + HttpPostRequestEncoder bodyRequestEncoder; + try { + bodyRequestEncoder = new HttpPostRequestEncoder(factory, httpRequest, true); + + Attribute requestAttribute = new MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST); + requestAttribute.setContent(payload); + bodyRequestEncoder.addBodyHttpData(requestAttribute); + + addPathsToEncoder(jars, FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE, bodyRequestEncoder); + addPathsToEncoder(userArtifacts, FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE, bodyRequestEncoder); --- End diff -- If we can send arbitrary files to the server and let the respective handler make sense of what is in what file, then we would also not need to introduce the different attributes.
---