Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5062#discussion_r154885293 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -251,34 +263,81 @@ private boolean attemptToBind(final int port) throws Throwable { throw future.cause(); } catch (BindException e) { log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage()); - shutdown(); + try { + shutdownServer(Time.seconds(10L)).get(); + } catch (Exception r) { + + // Here we were seeing this problem: + // https://github.com/netty/netty/issues/4357 if we do a get(). + // this is why we now simply wait a bit so that everything is + // shut down and then we check + + log.warn("Problem while shutting down {}: {}", serverName, r.getMessage()); + } } // any other type of exception we let it bubble up. return false; } /** * Shuts down the server and all related thread pools. + * @param timeout The time to wait for the shutdown process to complete. + * @return A {@link CompletableFuture} that will be completed upon termination of the shutdown process. */ - public void shutdown() { - log.info("Shutting down {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture<Void> shutdownServer(Time timeout) throws InterruptedException { + CompletableFuture<Void> shutdownFuture = new CompletableFuture<>(); + if (serverShutdownFuture.compareAndSet(null, shutdownFuture)) { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture<Void> groupShutdownFuture = new CompletableFuture<>(); + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null && !group.isShutdown()) { + group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS) + .addListener(finished -> { + if (finished.isSuccess()) { + groupShutdownFuture.complete(null); + } else { + groupShutdownFuture.completeExceptionally(finished.cause()); + } + }); + } else { + groupShutdownFuture.complete(null); + } + } else { + groupShutdownFuture.complete(null); + } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + final CompletableFuture<Void> handlerShutdownFuture = new CompletableFuture<>(); + if (handler == null) { + handlerShutdownFuture.complete(null); + } else { + handler.shutdown().whenComplete((result, throwable) -> { + if (throwable != null) { + handlerShutdownFuture.completeExceptionally(throwable); + } else { + handlerShutdownFuture.complete(null); + } + }); } + + final CompletableFuture<Void> queryExecShutdownFuture = CompletableFuture.runAsync(() -> { + if (queryExecutor != null) { + ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, queryExecutor); + } + }); + + CompletableFuture.allOf( + queryExecShutdownFuture, groupShutdownFuture, handlerShutdownFuture + ).whenComplete((result, throwable) -> { + if (throwable != null) { + shutdownFuture.completeExceptionally(throwable); + } else { + shutdownFuture.complete(null); + } + }); } - serverAddress = null; + return serverShutdownFuture.get(); --- End diff -- I do think so, because this would mean that we can restart the server that we intentionally shut down in that past, right? (This is only done in the start() when we fail to bind to a specific port).
---