Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6147#discussion_r195062578
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -183,6 +183,21 @@ public void shutdown(Time timeout) {
}
private <P extends ResponseBody> CompletableFuture<P>
submitRequest(String targetAddress, int targetPort, FullHttpRequest
httpRequest, JavaType responseType) {
+ return createChannelFuture(targetAddress, targetPort)
+ .thenComposeAsync(
+ channel -> {
+ ClientHandler handler =
channel.pipeline().get(ClientHandler.class);
+ CompletableFuture<JsonResponse> future
= handler.getJsonFuture();
+ channel.writeAndFlush(httpRequest);
+ return future;
+ },
+ executor)
+ .thenComposeAsync(
+ (JsonResponse rawResponse) ->
parseResponse(rawResponse, responseType),
+ executor);
+ }
+
+ private CompletableFuture<Channel> createChannelFuture(String
targetAddress, int targetPort) {
--- End diff --
Maybe rename to `openConnection` or `connectTo`.
---