I'm wondering...consider the `submitRequest` method from the `RestClient`:
```
private <P extends ResponseBody> CompletableFuture<P> submitRequest(String
targetAddress, int targetPort, Request httpRequest, JavaType responseType) {
final ChannelFuture connectFuture =
bootstrap.connect(targetAddress, targetPort);
final CompletableFuture<Channel> channelFuture = new
CompletableFuture<>();
connectFuture.addListener(
(ChannelFuture future) -> {
if (future.isSuccess()) {
channelFuture.complete(future.channel());
} else {
channelFuture.completeExceptionally(future.cause());
}
});
return channelFuture
.thenComposeAsync(
channel -> {
ClientHandler handler =
channel.pipeline().get(ClientHandler.class);
CompletableFuture<JsonResponse> future
= handler.getJsonFuture();
try {
httpRequest.writeTo(channel);
} catch (IOException e) {
return
FutureUtils.completedExceptionally(new FlinkException("Could not write
request.", e));
}
return future;
},
executor)
.thenComposeAsync(
(JsonResponse rawResponse) ->
parseResponse(rawResponse, responseType),
executor);
```
What happens if the client is shutdown, before the `thenComposeAsync` portions
have been run? Then the channelFuture is completed by the listener; and then
subsequent stages aren't executed?
[ Full content available at: https://github.com/apache/flink/pull/6763 ]
This message was relayed via gitbox.apache.org for [email protected]