Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6147#discussion_r195062578
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
    @@ -183,6 +183,21 @@ public void shutdown(Time timeout) {
        }
     
        private <P extends ResponseBody> CompletableFuture<P> 
submitRequest(String targetAddress, int targetPort, FullHttpRequest 
httpRequest, JavaType responseType) {
    +           return createChannelFuture(targetAddress, targetPort)
    +                   .thenComposeAsync(
    +                           channel -> {
    +                                   ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
    +                                   CompletableFuture<JsonResponse> future 
= handler.getJsonFuture();
    +                                   channel.writeAndFlush(httpRequest);
    +                                   return future;
    +                           },
    +                           executor)
    +                   .thenComposeAsync(
    +                           (JsonResponse rawResponse) -> 
parseResponse(rawResponse, responseType),
    +                           executor);
    +   }
    +
    +   private CompletableFuture<Channel> createChannelFuture(String 
targetAddress, int targetPort) {
    --- End diff --
    
    Maybe rename to `openConnection` or `connectTo`.


---

Reply via email to