[hotfix] Workaround for shut down deadlock of Netty < 4.0.33

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16ec3d7e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16ec3d7e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16ec3d7e

Branch: refs/heads/master
Commit: 16ec3d7ea12c520c5c86f0721553355cc938c2ae
Parents: fba655a
Author: Till Rohrmann <[email protected]>
Authored: Fri Feb 23 18:13:31 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Feb 23 18:22:10 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/rest/RestServerEndpoint.java  | 47 +++++++++++++++++---
 1 file changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/16ec3d7e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index bade160..f131ec1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.RouterHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -59,7 +60,9 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * An abstract class for netty-based REST server endpoints.
@@ -287,11 +290,14 @@ public abstract class RestServerEndpoint {
                                });
                                serverChannel = null;
                        }
-                       CompletableFuture<?> groupFuture = new 
CompletableFuture<>();
-                       CompletableFuture<?> childGroupFuture = new 
CompletableFuture<>();
-                       final Time gracePeriod = Time.seconds(10L);
+
+                       final CompletableFuture<Void> channelTerminationFuture 
= new CompletableFuture<>();
 
                        channelFuture.thenRun(() -> {
+                               CompletableFuture<?> groupFuture = new 
CompletableFuture<>();
+                               CompletableFuture<?> childGroupFuture = new 
CompletableFuture<>();
+                               final Time gracePeriod = Time.seconds(10L);
+
                                if (bootstrap != null) {
                                        if (bootstrap.group() != null) {
                                                
bootstrap.group().shutdownGracefully(0L, gracePeriod.toMilliseconds(), 
TimeUnit.MILLISECONDS)
@@ -302,7 +308,10 @@ public abstract class RestServerEndpoint {
                                                                        
groupFuture.completeExceptionally(finished.cause());
                                                                }
                                                        });
+                                       } else {
+                                               groupFuture.complete(null);
                                        }
+
                                        if (bootstrap.childGroup() != null) {
                                                
bootstrap.childGroup().shutdownGracefully(0L, gracePeriod.toMilliseconds(), 
TimeUnit.MILLISECONDS)
                                                        .addListener(finished 
-> {
@@ -312,17 +321,43 @@ public abstract class RestServerEndpoint {
                                                                        
childGroupFuture.completeExceptionally(finished.cause());
                                                                }
                                                        });
+                                       } else {
+                                               childGroupFuture.complete(null);
                                        }
+
                                        bootstrap = null;
                                } else {
                                        // complete the group futures since 
there is nothing to stop
                                        groupFuture.complete(null);
                                        childGroupFuture.complete(null);
                                }
-                       });
 
-                       final CompletableFuture<Void> channelTerminationFuture 
= FutureUtils.completeAll(
-                               Arrays.asList(groupFuture, childGroupFuture));
+                               CompletableFuture<Void> combinedFuture = 
FutureUtils.completeAll(Arrays.asList(groupFuture, childGroupFuture));
+
+                               // TODO: Temporary fix to circumvent shutdown 
bug in Netty < 4.0.33
+                               // See: 
https://github.com/netty/netty/issues/4357
+                               FutureUtils.orTimeout(combinedFuture, 
gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+                               combinedFuture
+                                       .exceptionally(
+                                               (Throwable throwable) -> {
+                                                       if (throwable 
instanceof TimeoutException) {
+                                                               // We ignore 
timeout exceptions because they indicate that Netty's shut down deadlocked
+                                                               log.info("Could 
not properly shut down Netty. Continue shut down of RestServerEndpoint.");
+                                                               return null;
+                                                       } else {
+                                                               throw new 
CompletionException(ExceptionUtils.stripCompletionException(throwable));
+                                                       }
+                                               })
+                                       .whenComplete(
+                                               (Void ignored, Throwable 
throwable) -> {
+                                                       if (throwable != null) {
+                                                               
channelTerminationFuture.completeExceptionally(throwable);
+                                                       } else {
+                                                               
channelTerminationFuture.complete(null);
+                                                       }
+                                               });
+                       });
 
                        return FutureUtils.runAfterwards(
                                channelTerminationFuture,

Reply via email to