Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5062#discussion_r154884801
--- Diff:
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
---
@@ -251,34 +263,81 @@ private boolean attemptToBind(final int port) throws
Throwable {
throw future.cause();
} catch (BindException e) {
log.debug("Failed to start {} on port {}: {}.",
serverName, port, e.getMessage());
- shutdown();
+ try {
+ shutdownServer(Time.seconds(10L)).get();
+ } catch (Exception r) {
+
+ // Here we were seeing this problem:
+ // https://github.com/netty/netty/issues/4357
if we do a get().
+ // this is why we now simply wait a bit so that
everything is
+ // shut down and then we check
+
+ log.warn("Problem while shutting down {}: {}",
serverName, r.getMessage());
+ }
}
// any other type of exception we let it bubble up.
return false;
}
/**
* Shuts down the server and all related thread pools.
+ * @param timeout The time to wait for the shutdown process to complete.
+ * @return A {@link CompletableFuture} that will be completed upon
termination of the shutdown process.
*/
- public void shutdown() {
- log.info("Shutting down {} @ {}", serverName, serverAddress);
-
- if (handler != null) {
- handler.shutdown();
- handler = null;
- }
-
- if (queryExecutor != null) {
- queryExecutor.shutdown();
- }
+ public CompletableFuture<Void> shutdownServer(Time timeout) throws
InterruptedException {
+ CompletableFuture<Void> shutdownFuture = new
CompletableFuture<>();
+ if (serverShutdownFuture.compareAndSet(null, shutdownFuture)) {
--- End diff --
Here the idea is that we atomically set the future the first time
(`serverShutdownFuture.compareAndSet(null, shutdownFuture)`), and then, at each
subsequent call we return that exact future (`serverShutdownFuture.get();`). So
I cannot see how this can lead to returning a future that never completes.
---