Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5062#discussion_r153801913
--- Diff:
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
---
@@ -422,20 +467,31 @@ void close() {
* @param cause The cause to close the channel with.
* @return Channel close future
*/
- private boolean close(Throwable cause) {
- if (failureCause.compareAndSet(null, cause)) {
- channel.close();
- stats.reportInactiveConnection();
+ private CompletableFuture<Void> close(Throwable cause) {
+ final CompletableFuture<Void> shutdownFuture = new
CompletableFuture<>();
- for (long requestId : pendingRequests.keySet())
{
- TimestampedCompletableFuture pending =
pendingRequests.remove(requestId);
- if (pending != null &&
pending.completeExceptionally(cause)) {
- stats.reportFailedRequest();
+ if (connectionShutdownFuture.compareAndSet(null,
shutdownFuture) &&
--- End diff --
Let's also log other shutdown attempts as DEBUG.
---