Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6147#discussion_r195075314
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
    @@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
                                executor);
        }
     
    +   public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
    +                   String targetAddress,
    +                   int targetPort,
    +                   M messageHeaders,
    +                   U messageParameters,
    +                   R request,
    +                   Collection<Path> jars,
    +                   Collection<Path> userArtifacts) throws IOException {
    +           Preconditions.checkNotNull(targetAddress);
    +           Preconditions.checkArgument(0 <= targetPort && targetPort < 
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
    +           Preconditions.checkNotNull(messageHeaders);
    +           Preconditions.checkNotNull(request);
    +           Preconditions.checkNotNull(messageParameters);
    +           Preconditions.checkState(messageParameters.isResolved(), 
"Message parameters were not resolved.");
    +
    +           String targetUrl = 
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), 
messageParameters);
    +
    +           LOG.debug("Sending request of class {} to {}:{}{}", 
request.getClass(), targetAddress, targetPort, targetUrl);
    +           // serialize payload
    +           StringWriter sw = new StringWriter();
    +           objectMapper.writeValue(sw, request);
    +           ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
    +
    +           // do not load file into memory, this can have weird 
side-effects and break functionality
    +           HttpDataFactory factory = new DefaultHttpDataFactory(true);
    +
    +           HttpRequest httpRequest = new 
DefaultHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), 
messageHeaders.getTargetRestEndpointURL());
    +           httpRequest.headers()
    +                   .set(HttpHeaders.Names.HOST, targetAddress + ':' + 
targetPort)
    +                   .set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
    +
    +           // takes care of splitting the request into multiple parts
    +           HttpPostRequestEncoder bodyRequestEncoder;
    +           try {
    +                   bodyRequestEncoder = new 
HttpPostRequestEncoder(factory, httpRequest, true);
    +
    +                   Attribute requestAttribute = new 
MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
    +                   requestAttribute.setContent(payload);
    +                   bodyRequestEncoder.addBodyHttpData(requestAttribute);
    +
    +                   addPathsToEncoder(jars, 
FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE, 
bodyRequestEncoder);
    +                   addPathsToEncoder(userArtifacts, 
FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE, 
bodyRequestEncoder);
    +
    +                   bodyRequestEncoder.finalizeRequest();
    --- End diff --
    
    If it's not a multi-part request, then we should send the `HttpRequest` 
which is returned here.


---

Reply via email to