Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6147#discussion_r195077673
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
    @@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
                                executor);
        }
     
    +   public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
    +                   String targetAddress,
    +                   int targetPort,
    +                   M messageHeaders,
    +                   U messageParameters,
    +                   R request,
    +                   Collection<Path> jars,
    +                   Collection<Path> userArtifacts) throws IOException {
    +           Preconditions.checkNotNull(targetAddress);
    +           Preconditions.checkArgument(0 <= targetPort && targetPort < 
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
    +           Preconditions.checkNotNull(messageHeaders);
    +           Preconditions.checkNotNull(request);
    +           Preconditions.checkNotNull(messageParameters);
    +           Preconditions.checkState(messageParameters.isResolved(), 
"Message parameters were not resolved.");
    +
    +           String targetUrl = 
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), 
messageParameters);
    +
    +           LOG.debug("Sending request of class {} to {}:{}{}", 
request.getClass(), targetAddress, targetPort, targetUrl);
    +           // serialize payload
    +           StringWriter sw = new StringWriter();
    +           objectMapper.writeValue(sw, request);
    +           ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
    +
    +           // do not load file into memory, this can have weird 
side-effects and break functionality
    +           HttpDataFactory factory = new DefaultHttpDataFactory(true);
    +
    +           HttpRequest httpRequest = new 
DefaultHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), 
messageHeaders.getTargetRestEndpointURL());
    +           httpRequest.headers()
    +                   .set(HttpHeaders.Names.HOST, targetAddress + ':' + 
targetPort)
    +                   .set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
    +
    +           // takes care of splitting the request into multiple parts
    +           HttpPostRequestEncoder bodyRequestEncoder;
    +           try {
    +                   bodyRequestEncoder = new 
HttpPostRequestEncoder(factory, httpRequest, true);
    +
    +                   Attribute requestAttribute = new 
MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
    +                   requestAttribute.setContent(payload);
    +                   bodyRequestEncoder.addBodyHttpData(requestAttribute);
    +
    +                   addPathsToEncoder(jars, 
FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE, 
bodyRequestEncoder);
    +                   addPathsToEncoder(userArtifacts, 
FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE, 
bodyRequestEncoder);
    --- End diff --
    
    If we can send arbitrary files to the server and let the respective handler 
make sense of what is in what file, then we would also not need to introduce 
the different attributes.


---

Reply via email to