[hotfix] Workaround for shut down deadlock of Netty < 4.0.33
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16ec3d7e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16ec3d7e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16ec3d7e Branch: refs/heads/master Commit: 16ec3d7ea12c520c5c86f0721553355cc938c2ae Parents: fba655a Author: Till Rohrmann <[email protected]> Authored: Fri Feb 23 18:13:31 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Feb 23 18:22:10 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/rest/RestServerEndpoint.java | 47 +++++++++++++++++--- 1 file changed, 41 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/16ec3d7e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index bade160..f131ec1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.RouterHandler; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; @@ -59,7 +60,9 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * An abstract class for netty-based REST server endpoints. @@ -287,11 +290,14 @@ public abstract class RestServerEndpoint { }); serverChannel = null; } - CompletableFuture<?> groupFuture = new CompletableFuture<>(); - CompletableFuture<?> childGroupFuture = new CompletableFuture<>(); - final Time gracePeriod = Time.seconds(10L); + + final CompletableFuture<Void> channelTerminationFuture = new CompletableFuture<>(); channelFuture.thenRun(() -> { + CompletableFuture<?> groupFuture = new CompletableFuture<>(); + CompletableFuture<?> childGroupFuture = new CompletableFuture<>(); + final Time gracePeriod = Time.seconds(10L); + if (bootstrap != null) { if (bootstrap.group() != null) { bootstrap.group().shutdownGracefully(0L, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS) @@ -302,7 +308,10 @@ public abstract class RestServerEndpoint { groupFuture.completeExceptionally(finished.cause()); } }); + } else { + groupFuture.complete(null); } + if (bootstrap.childGroup() != null) { bootstrap.childGroup().shutdownGracefully(0L, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS) .addListener(finished -> { @@ -312,17 +321,43 @@ public abstract class RestServerEndpoint { childGroupFuture.completeExceptionally(finished.cause()); } }); + } else { + childGroupFuture.complete(null); } + bootstrap = null; } else { // complete the group futures since there is nothing to stop groupFuture.complete(null); childGroupFuture.complete(null); } - }); - final CompletableFuture<Void> channelTerminationFuture = FutureUtils.completeAll( - Arrays.asList(groupFuture, childGroupFuture)); + CompletableFuture<Void> combinedFuture = FutureUtils.completeAll(Arrays.asList(groupFuture, childGroupFuture)); + + // TODO: Temporary fix to circumvent shutdown bug in Netty < 4.0.33 + // See: https://github.com/netty/netty/issues/4357 + FutureUtils.orTimeout(combinedFuture, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS); + + combinedFuture + .exceptionally( + (Throwable throwable) -> { + if (throwable instanceof TimeoutException) { + // We ignore timeout exceptions because they indicate that Netty's shut down deadlocked + log.info("Could not properly shut down Netty. Continue shut down of RestServerEndpoint."); + return null; + } else { + throw new CompletionException(ExceptionUtils.stripCompletionException(throwable)); + } + }) + .whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + channelTerminationFuture.completeExceptionally(throwable); + } else { + channelTerminationFuture.complete(null); + } + }); + }); return FutureUtils.runAfterwards( channelTerminationFuture,
