Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5062#discussion_r154885293
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
    @@ -251,34 +263,81 @@ private boolean attemptToBind(final int port) throws 
Throwable {
                        throw future.cause();
                } catch (BindException e) {
                        log.debug("Failed to start {} on port {}: {}.", 
serverName, port, e.getMessage());
    -                   shutdown();
    +                   try {
    +                           shutdownServer(Time.seconds(10L)).get();
    +                   } catch (Exception r) {
    +
    +                           // Here we were seeing this problem:
    +                           // https://github.com/netty/netty/issues/4357 
if we do a get().
    +                           // this is why we now simply wait a bit so that 
everything is
    +                           // shut down and then we check
    +
    +                           log.warn("Problem while shutting down {}: {}", 
serverName, r.getMessage());
    +                   }
                }
                // any other type of exception we let it bubble up.
                return false;
        }
     
        /**
         * Shuts down the server and all related thread pools.
    +    * @param timeout The time to wait for the shutdown process to complete.
    +    * @return A {@link CompletableFuture} that will be completed upon 
termination of the shutdown process.
         */
    -   public void shutdown() {
    -           log.info("Shutting down {} @ {}", serverName, serverAddress);
    -
    -           if (handler != null) {
    -                   handler.shutdown();
    -                   handler = null;
    -           }
    -
    -           if (queryExecutor != null) {
    -                   queryExecutor.shutdown();
    -           }
    +   public CompletableFuture<Void> shutdownServer(Time timeout) throws 
InterruptedException {
    +           CompletableFuture<Void> shutdownFuture = new 
CompletableFuture<>();
    +           if (serverShutdownFuture.compareAndSet(null, shutdownFuture)) {
    +                   log.info("Shutting down {} @ {}", serverName, 
serverAddress);
    +
    +                   final CompletableFuture<Void> groupShutdownFuture = new 
CompletableFuture<>();
    +                   if (bootstrap != null) {
    +                           EventLoopGroup group = bootstrap.group();
    +                           if (group != null && !group.isShutdown()) {
    +                                   group.shutdownGracefully(0L, 0L, 
TimeUnit.MILLISECONDS)
    +                                                   .addListener(finished 
-> {
    +                                                           if 
(finished.isSuccess()) {
    +                                                                   
groupShutdownFuture.complete(null);
    +                                                           } else {
    +                                                                   
groupShutdownFuture.completeExceptionally(finished.cause());
    +                                                           }
    +                                                   });
    +                           } else {
    +                                   groupShutdownFuture.complete(null);
    +                           }
    +                   } else {
    +                           groupShutdownFuture.complete(null);
    +                   }
     
    -           if (bootstrap != null) {
    -                   EventLoopGroup group = bootstrap.group();
    -                   if (group != null) {
    -                           group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
    +                   final CompletableFuture<Void> handlerShutdownFuture = 
new CompletableFuture<>();
    +                   if (handler == null) {
    +                           handlerShutdownFuture.complete(null);
    +                   } else {
    +                           handler.shutdown().whenComplete((result, 
throwable) -> {
    +                                   if (throwable != null) {
    +                                           
handlerShutdownFuture.completeExceptionally(throwable);
    +                                   } else {
    +                                           
handlerShutdownFuture.complete(null);
    +                                   }
    +                           });
                        }
    +
    +                   final CompletableFuture<Void> queryExecShutdownFuture = 
CompletableFuture.runAsync(() -> {
    +                           if (queryExecutor != null) {
    +                                   
ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, 
queryExecutor);
    +                           }
    +                   });
    +
    +                   CompletableFuture.allOf(
    +                                   queryExecShutdownFuture, 
groupShutdownFuture, handlerShutdownFuture
    +                   ).whenComplete((result, throwable) -> {
    +                           if (throwable != null) {
    +                                   
shutdownFuture.completeExceptionally(throwable);
    +                           } else {
    +                                   shutdownFuture.complete(null);
    +                           }
    +                   });
                }
    -           serverAddress = null;
    +           return serverShutdownFuture.get();
    --- End diff --
    
    I do think so, because this would mean that we can restart the server that 
we intentionally shut down in that past, right? (This is only done in the 
start() when we fail to bind to a specific port).


---

Reply via email to