Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5062#discussion_r153798113
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
    @@ -251,34 +263,81 @@ private boolean attemptToBind(final int port) throws 
Throwable {
                        throw future.cause();
                } catch (BindException e) {
                        log.debug("Failed to start {} on port {}: {}.", 
serverName, port, e.getMessage());
    -                   shutdown();
    +                   try {
    +                           shutdownServer(Time.seconds(10L)).get();
    +                   } catch (Exception r) {
    +
    +                           // Here we were seeing this problem:
    +                           // https://github.com/netty/netty/issues/4357 
if we do a get().
    +                           // this is why we now simply wait a bit so that 
everything is
    +                           // shut down and then we check
    +
    +                           log.warn("Problem while shutting down {}: {}", 
serverName, r.getMessage());
    +                   }
                }
                // any other type of exception we let it bubble up.
                return false;
        }
     
        /**
         * Shuts down the server and all related thread pools.
    +    * @param timeout The time to wait for the shutdown process to complete.
    +    * @return A {@link CompletableFuture} that will be completed upon 
termination of the shutdown process.
         */
    -   public void shutdown() {
    -           log.info("Shutting down {} @ {}", serverName, serverAddress);
    -
    -           if (handler != null) {
    -                   handler.shutdown();
    -                   handler = null;
    -           }
    -
    -           if (queryExecutor != null) {
    -                   queryExecutor.shutdown();
    -           }
    +   public CompletableFuture<Void> shutdownServer(Time timeout) throws 
InterruptedException {
    +           CompletableFuture<Void> shutdownFuture = new 
CompletableFuture<>();
    +           if (serverShutdownFuture.compareAndSet(null, shutdownFuture)) {
    +                   log.info("Shutting down {} @ {}", serverName, 
serverAddress);
    +
    +                   final CompletableFuture<Void> groupShutdownFuture = new 
CompletableFuture<>();
    +                   if (bootstrap != null) {
    +                           EventLoopGroup group = bootstrap.group();
    +                           if (group != null && !group.isShutdown()) {
    +                                   group.shutdownGracefully(0L, 0L, 
TimeUnit.MILLISECONDS)
    +                                                   .addListener(finished 
-> {
    +                                                           if 
(finished.isSuccess()) {
    +                                                                   
groupShutdownFuture.complete(null);
    +                                                           } else {
    +                                                                   
groupShutdownFuture.completeExceptionally(finished.cause());
    +                                                           }
    +                                                   });
    +                           } else {
    +                                   groupShutdownFuture.complete(null);
    +                           }
    +                   } else {
    +                           groupShutdownFuture.complete(null);
    +                   }
     
    -           if (bootstrap != null) {
    -                   EventLoopGroup group = bootstrap.group();
    -                   if (group != null) {
    -                           group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
    +                   final CompletableFuture<Void> handlerShutdownFuture = 
new CompletableFuture<>();
    +                   if (handler == null) {
    +                           handlerShutdownFuture.complete(null);
    +                   } else {
    +                           handler.shutdown().whenComplete((result, 
throwable) -> {
    +                                   if (throwable != null) {
    +                                           
handlerShutdownFuture.completeExceptionally(throwable);
    +                                   } else {
    +                                           
handlerShutdownFuture.complete(null);
    +                                   }
    +                           });
                        }
    +
    +                   final CompletableFuture<Void> queryExecShutdownFuture = 
CompletableFuture.runAsync(() -> {
    +                           if (queryExecutor != null) {
    +                                   
ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, 
queryExecutor);
    +                           }
    +                   });
    +
    +                   CompletableFuture.allOf(
    +                                   queryExecShutdownFuture, 
groupShutdownFuture, handlerShutdownFuture
    +                   ).whenComplete((result, throwable) -> {
    +                           if (throwable != null) {
    +                                   
shutdownFuture.completeExceptionally(throwable);
    +                           } else {
    +                                   shutdownFuture.complete(null);
    +                           }
    +                   });
                }
    -           serverAddress = null;
    +           return serverShutdownFuture.get();
    --- End diff --
    
    if `serverShutdownFuture.get != null` is meant to signal a shutdown in 
progress, then this method should set it back to null before returning.


---

Reply via email to