[
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270836#comment-16270836
]
ASF GitHub Bot commented on FLINK-7880:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5062#discussion_r153798113
--- Diff:
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
---
@@ -251,34 +263,81 @@ private boolean attemptToBind(final int port) throws
Throwable {
throw future.cause();
} catch (BindException e) {
log.debug("Failed to start {} on port {}: {}.",
serverName, port, e.getMessage());
- shutdown();
+ try {
+ shutdownServer(Time.seconds(10L)).get();
+ } catch (Exception r) {
+
+ // Here we were seeing this problem:
+ // https://github.com/netty/netty/issues/4357
if we do a get().
+ // this is why we now simply wait a bit so that
everything is
+ // shut down and then we check
+
+ log.warn("Problem while shutting down {}: {}",
serverName, r.getMessage());
+ }
}
// any other type of exception we let it bubble up.
return false;
}
/**
* Shuts down the server and all related thread pools.
+ * @param timeout The time to wait for the shutdown process to complete.
+ * @return A {@link CompletableFuture} that will be completed upon
termination of the shutdown process.
*/
- public void shutdown() {
- log.info("Shutting down {} @ {}", serverName, serverAddress);
-
- if (handler != null) {
- handler.shutdown();
- handler = null;
- }
-
- if (queryExecutor != null) {
- queryExecutor.shutdown();
- }
+ public CompletableFuture<Void> shutdownServer(Time timeout) throws
InterruptedException {
+ CompletableFuture<Void> shutdownFuture = new
CompletableFuture<>();
+ if (serverShutdownFuture.compareAndSet(null, shutdownFuture)) {
+ log.info("Shutting down {} @ {}", serverName,
serverAddress);
+
+ final CompletableFuture<Void> groupShutdownFuture = new
CompletableFuture<>();
+ if (bootstrap != null) {
+ EventLoopGroup group = bootstrap.group();
+ if (group != null && !group.isShutdown()) {
+ group.shutdownGracefully(0L, 0L,
TimeUnit.MILLISECONDS)
+ .addListener(finished
-> {
+ if
(finished.isSuccess()) {
+
groupShutdownFuture.complete(null);
+ } else {
+
groupShutdownFuture.completeExceptionally(finished.cause());
+ }
+ });
+ } else {
+ groupShutdownFuture.complete(null);
+ }
+ } else {
+ groupShutdownFuture.complete(null);
+ }
- if (bootstrap != null) {
- EventLoopGroup group = bootstrap.group();
- if (group != null) {
- group.shutdownGracefully(0L, 10L,
TimeUnit.SECONDS);
+ final CompletableFuture<Void> handlerShutdownFuture =
new CompletableFuture<>();
+ if (handler == null) {
+ handlerShutdownFuture.complete(null);
+ } else {
+ handler.shutdown().whenComplete((result,
throwable) -> {
+ if (throwable != null) {
+
handlerShutdownFuture.completeExceptionally(throwable);
+ } else {
+
handlerShutdownFuture.complete(null);
+ }
+ });
}
+
+ final CompletableFuture<Void> queryExecShutdownFuture =
CompletableFuture.runAsync(() -> {
+ if (queryExecutor != null) {
+
ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS,
queryExecutor);
+ }
+ });
+
+ CompletableFuture.allOf(
+ queryExecShutdownFuture,
groupShutdownFuture, handlerShutdownFuture
+ ).whenComplete((result, throwable) -> {
+ if (throwable != null) {
+
shutdownFuture.completeExceptionally(throwable);
+ } else {
+ shutdownFuture.complete(null);
+ }
+ });
}
- serverAddress = null;
+ return serverShutdownFuture.get();
--- End diff --
if `serverShutdownFuture.get != null` is meant to signal a shutdown in
progress, then this method should set it back to null before returning.
> flink-queryable-state-java fails with core-dump
> -----------------------------------------------
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
> Issue Type: Bug
> Components: Queryable State, Tests
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Kostas Kloudas
> Priority: Critical
> Labels: test-stability
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)